C++ Client API Reference for Oracle Coherence
14c (14.1.2.0.0)

F79659-03

coherence/util/AbstractConcurrentQueue.hpp

00001 /*
00002  * Copyright (c) 2000, 2020, Oracle and/or its affiliates.
00003  *
00004  * Licensed under the Universal Permissive License v 1.0 as shown at
00005  * http://oss.oracle.com/licenses/upl.
00006  */
00007 #ifndef COH_ABSTRACT_CONCURRENT_QUEUE
00008 #define COH_ABSTRACT_CONCURRENT_QUEUE
00009 
00010 #include "coherence/lang.ns"
00011 
00012 #include "coherence/native/NativeAtomic32.hpp"
00013 #include "coherence/util/Queue.hpp"
00014 
00015 COH_OPEN_NAMESPACE2(coherence,util)
00016 
00017 using coherence::native::NativeAtomic32;
00018 
00019 /**
00020 * The ConcurrentQueue provides a means to efficiently (and in a thread-safe
00021 * manner) queue elements with minimal contention.
00022 *
00023 * Note: The ConcurrentQueue does not support null entries.
00024 *
00025 * @author nsa 2008.01.19
00026 */
00027 class COH_EXPORT AbstractConcurrentQueue
00028     : public abstract_spec<AbstractConcurrentQueue,
00029         extends<Object>,
00030         implements<Queue> >
00031     {
00032     // ----- enums ----------------------------------------------------------
00033 
00034     public:
00035         /**
00036         * The FlushState values are used to indicate the state of the Queue
00037         * with regards to flushing:
00038         *
00039         * <ul>
00040         *   <li>
00041         *     flush_pending:  Indicates that a flush is pending. At
00042         *                     some point the queue will flush itself
00043         *                     automatically or the consumer will explicitly
00044         *                     flush the queue.
00045         *   </li>
00046         *   <li>
00047         *     flush_auto:     Indicates that no flush is pending as the queue
00048         *                     has been auto flushed. This state will not be
00049         *                     reset to flush_pending until the queue has been
00050         *                     cleared and the producer has added a new
00051         *                     element.
00052         *   </li>
00053         *   <li>
00054         *     flush_explicit: Indicates that no flush is pending as the queue
00055         *                     has been explicitly flushed. This state will
00056         *                     not be reset to flush_pending until the queue
00057         *                     has been cleared and the producer has added a
00058         *                     new element, or if the producer calls flush
00059         *                     multiple times before the queue has
00060         *                     been cleared.
00061         *   </li>
00062         * </ul>
00063         */
00064         typedef enum
00065             {
00066             flush_pending = 0,
00067             flush_auto = 1,
00068             flush_explicit = 2
00069             } FlushState;
00070 
00071 
00072     // ----- constructors ---------------------------------------------------
00073 
00074     protected:
00075         /**
00076         * @internal
00077         */
00078         AbstractConcurrentQueue();
00079 
00080 
00081     // ----- AbstractConcurrentQueue interface ------------------------------
00082 
00083     public:
00084         /**
00085         * Return the current flush state.
00086         */
00087         virtual FlushState getFlushState() const;
00088 
00089         /**
00090         * Return whether a flush is pending or not.
00091         *
00092         * @return true if a flush is pending
00093         */
00094         virtual bool isFlushPending() const;
00095 
00096         /**
00097         * Set the batch size of the queue.
00098         *
00099         * @param cBatch  the batch size to set
00100         */
00101         virtual void setBatchSize(int32_t cBatch);
00102 
00103         /**
00104         * Return the batch size of the queue.
00105         *
00106         * @return the batch size of the queue
00107         */
00108         virtual int32_t getBatchSize() const;
00109 
00110         /**
00111         * Return the object used for notifications on this queue.
00112         *
00113         * @return the object used for notifications on this queue.
00114         */
00115         virtual Object::Handle getNotifier();
00116 
00117         /**
00118         * Set the object used for notifications on this queue.
00119         *
00120         * @param hNotifier  the object used for notifications on this queue
00121         */
00122         virtual void setNotifier(Object::Handle hNotifier);
00123 
00124         /**
00125         * Wait for the queue to contain at least one entry.
00126         *
00127         * Note: By the time the method returns the entry may have already
00128         * been removed by another thread.
00129         *
00130         * @param cMillis the number of milliseconds to wait before returing
00131         *                without having been notified, or 0 to wait until
00132         *                notified
00133         */
00134         virtual void waitForEntry(int64_t cMillis);
00135 
00136         /**
00137         * Return the total number of times the queue has been emptied.
00138         *
00139         * @return the total number of times the queue has been emptied.
00140         */
00141         virtual int64_t getStatsEmptied() const;
00142 
00143         /**
00144         * Return the total number of times the queue has been flushed.
00145         *
00146         * @return the total number of times the queue has been flushed
00147         */
00148         virtual int64_t getStatsFlushed() const;
00149 
00150 
00151     // ----- internal helpers -----------------------------------------------
00152 
00153     protected:
00154         /**
00155         * Check whether or not the flush (notify) is necessary. This method
00156         * is always called when a new item is added to the queue.
00157         *
00158         * @param cElements  the number of elements in the queue after the
00159         *                   addition
00160         */
00161         virtual void checkFlush(int32_t cElements);
00162 
00163         /**
00164         * Flush the queue.
00165         *
00166         * @param fAuto  iff the flush was invoked automatically based on the
00167         *               notification batch size
00168         *
00169         */
00170         virtual void flush(bool fAuto);
00171 
00172         /**
00173         * Event called each time an element is added to the queue.
00174         */
00175         virtual void onAddElement();
00176 
00177         /**
00178         * Event called when the queue becomes empty.
00179         */
00180         virtual void onEmpty();
00181 
00182         /**
00183         * Set the flush state and return the previous state.
00184         *
00185         * @param nState  the state to set
00186         *
00187         * @return the previous flush state
00188         */
00189         virtual FlushState updateFlushState(FlushState nState);
00190 
00191         /**
00192         * Set the flush state iff the assumed state is correct, return the
00193         * previous flushState
00194         *
00195         * @param nStateAssumed  the assumed current value of the flush state
00196         * @param nStateNew      the new flush state value
00197         *
00198         * @return FlushState  the old flush state value
00199         */
00200         virtual FlushState updateFlushStateConditionally(FlushState nStateAssumed,
00201                 FlushState nStateNew);
00202 
00203         /**
00204         * Set the total number of times the queue has been emptied.
00205         *
00206         * @param cEmpties  the total number of times the queue has been
00207         *                  flushed.
00208         */
00209         virtual void setStatsEmptied(int64_t cEmpties);
00210 
00211         /**
00212         * Set the total number of times the queue has been flushed.
00213         *
00214         * @param cFlushed  the total number of times the queue has been
00215         *                  flushed
00216         */
00217         virtual void setStatsFlushed(int64_t cFlushed);
00218 
00219 
00220     // ----- Queue interface ------------------------------------------------
00221 
00222     public:
00223         /**
00224         * @{inheritDoc}
00225         */
00226         virtual void flush();
00227 
00228         /**
00229         * @{inheritDoc}
00230         */
00231         virtual size32_t size() const;
00232 
00233         /**
00234         *  @{inheritDoc}
00235         */
00236         virtual Object::Holder remove();
00237 
00238 
00239     // ----- data members ---------------------------------------------------
00240 
00241     protected:
00242         /**
00243         * The AtomicLong used to maintain the FlushState.  See
00244         * getFlushState() and setFlushState() helper methods.
00245         */
00246         NativeAtomic32 m_nAtomicFlushState;
00247 
00248         /**
00249         * The queue size at which to auto-flush the queue during an add
00250         * operation.  If the BatchSize is greater then one, the caller must
00251         * externally call flush() when it has finished adding elements in
00252         * order to ensure that they may be processed by any waiting consumer
00253         * thread.
00254         */
00255         int32_t m_iBatchSize;
00256 
00257         /**
00258         * A counter for maintaining the size of the queue.
00259         */
00260         NativeAtomic32 m_nElementCounter;
00261 
00262         /**
00263         * The monitor on which notifications related to a queue addition
00264         * will be performed.  The default value is the Queue itself.  The
00265         * Notifier should not be changed while the queue is in use.  If the
00266         * Notifier is null then notification will be disabled.
00267         */
00268         FinalHandle<Object> f_hNotifier;
00269 
00270         /**
00271         * The total number of times the queue transitioned to the empty
00272         * state.
00273         */
00274         int64_t m_lStatsEmptied;
00275 
00276         /**
00277         * The total number of times the queue has been flushed.
00278         */
00279         int64_t m_lStatusFlushed;
00280 
00281         /**
00282         * Indicate whether the notifier references itself
00283         */
00284         bool m_fSelfNotifier;
00285     };
00286 
00287 COH_CLOSE_NAMESPACE2
00288 
00289 #endif // COH_ABSTRACT_CONCURRENT_QUEUE
Copyright © 2000, 2025, Oracle and/or its affiliates. Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.