00001 /* 00002 * Copyright (c) 2000, 2020, Oracle and/or its affiliates. 00003 * 00004 * Licensed under the Universal Permissive License v 1.0 as shown at 00005 * http://oss.oracle.com/licenses/upl. 00006 */ 00007 #ifndef COH_ABSTRACT_CONCURRENT_QUEUE 00008 #define COH_ABSTRACT_CONCURRENT_QUEUE 00009 00010 #include "coherence/lang.ns" 00011 00012 #include "coherence/native/NativeAtomic32.hpp" 00013 #include "coherence/util/Queue.hpp" 00014 00015 COH_OPEN_NAMESPACE2(coherence,util) 00016 00017 using coherence::native::NativeAtomic32; 00018 00019 /** 00020 * The ConcurrentQueue provides a means to efficiently (and in a thread-safe 00021 * manner) queue elements with minimal contention. 00022 * 00023 * Note: The ConcurrentQueue does not support null entries. 00024 * 00025 * @author nsa 2008.01.19 00026 */ 00027 class COH_EXPORT AbstractConcurrentQueue 00028 : public abstract_spec<AbstractConcurrentQueue, 00029 extends<Object>, 00030 implements<Queue> > 00031 { 00032 // ----- enums ---------------------------------------------------------- 00033 00034 public: 00035 /** 00036 * The FlushState values are used to indicate the state of the Queue 00037 * with regards to flushing: 00038 * 00039 * <ul> 00040 * <li> 00041 * flush_pending: Indicates that a flush is pending. At 00042 * some point the queue will flush itself 00043 * automatically or the consumer will explicitly 00044 * flush the queue. 00045 * </li> 00046 * <li> 00047 * flush_auto: Indicates that no flush is pending as the queue 00048 * has been auto flushed. This state will not be 00049 * reset to flush_pending until the queue has been 00050 * cleared and the producer has added a new 00051 * element. 00052 * </li> 00053 * <li> 00054 * flush_explicit: Indicates that no flush is pending as the queue 00055 * has been explicitly flushed. This state will 00056 * not be reset to flush_pending until the queue 00057 * has been cleared and the producer has added a 00058 * new element, or if the producer calls flush 00059 * multiple times before the queue has 00060 * been cleared. 00061 * </li> 00062 * </ul> 00063 */ 00064 typedef enum 00065 { 00066 flush_pending = 0, 00067 flush_auto = 1, 00068 flush_explicit = 2 00069 } FlushState; 00070 00071 00072 // ----- constructors --------------------------------------------------- 00073 00074 protected: 00075 /** 00076 * @internal 00077 */ 00078 AbstractConcurrentQueue(); 00079 00080 00081 // ----- AbstractConcurrentQueue interface ------------------------------ 00082 00083 public: 00084 /** 00085 * Return the current flush state. 00086 */ 00087 virtual FlushState getFlushState() const; 00088 00089 /** 00090 * Return whether a flush is pending or not. 00091 * 00092 * @return true if a flush is pending 00093 */ 00094 virtual bool isFlushPending() const; 00095 00096 /** 00097 * Set the batch size of the queue. 00098 * 00099 * @param cBatch the batch size to set 00100 */ 00101 virtual void setBatchSize(int32_t cBatch); 00102 00103 /** 00104 * Return the batch size of the queue. 00105 * 00106 * @return the batch size of the queue 00107 */ 00108 virtual int32_t getBatchSize() const; 00109 00110 /** 00111 * Return the object used for notifications on this queue. 00112 * 00113 * @return the object used for notifications on this queue. 00114 */ 00115 virtual Object::Handle getNotifier(); 00116 00117 /** 00118 * Set the object used for notifications on this queue. 00119 * 00120 * @param hNotifier the object used for notifications on this queue 00121 */ 00122 virtual void setNotifier(Object::Handle hNotifier); 00123 00124 /** 00125 * Wait for the queue to contain at least one entry. 00126 * 00127 * Note: By the time the method returns the entry may have already 00128 * been removed by another thread. 00129 * 00130 * @param cMillis the number of milliseconds to wait before returing 00131 * without having been notified, or 0 to wait until 00132 * notified 00133 */ 00134 virtual void waitForEntry(int64_t cMillis); 00135 00136 /** 00137 * Return the total number of times the queue has been emptied. 00138 * 00139 * @return the total number of times the queue has been emptied. 00140 */ 00141 virtual int64_t getStatsEmptied() const; 00142 00143 /** 00144 * Return the total number of times the queue has been flushed. 00145 * 00146 * @return the total number of times the queue has been flushed 00147 */ 00148 virtual int64_t getStatsFlushed() const; 00149 00150 00151 // ----- internal helpers ----------------------------------------------- 00152 00153 protected: 00154 /** 00155 * Check whether or not the flush (notify) is necessary. This method 00156 * is always called when a new item is added to the queue. 00157 * 00158 * @param cElements the number of elements in the queue after the 00159 * addition 00160 */ 00161 virtual void checkFlush(int32_t cElements); 00162 00163 /** 00164 * Flush the queue. 00165 * 00166 * @param fAuto iff the flush was invoked automatically based on the 00167 * notification batch size 00168 * 00169 */ 00170 virtual void flush(bool fAuto); 00171 00172 /** 00173 * Event called each time an element is added to the queue. 00174 */ 00175 virtual void onAddElement(); 00176 00177 /** 00178 * Event called when the queue becomes empty. 00179 */ 00180 virtual void onEmpty(); 00181 00182 /** 00183 * Set the flush state and return the previous state. 00184 * 00185 * @param nState the state to set 00186 * 00187 * @return the previous flush state 00188 */ 00189 virtual FlushState updateFlushState(FlushState nState); 00190 00191 /** 00192 * Set the flush state iff the assumed state is correct, return the 00193 * previous flushState 00194 * 00195 * @param nStateAssumed the assumed current value of the flush state 00196 * @param nStateNew the new flush state value 00197 * 00198 * @return FlushState the old flush state value 00199 */ 00200 virtual FlushState updateFlushStateConditionally(FlushState nStateAssumed, 00201 FlushState nStateNew); 00202 00203 /** 00204 * Set the total number of times the queue has been emptied. 00205 * 00206 * @param cEmpties the total number of times the queue has been 00207 * flushed. 00208 */ 00209 virtual void setStatsEmptied(int64_t cEmpties); 00210 00211 /** 00212 * Set the total number of times the queue has been flushed. 00213 * 00214 * @param cFlushed the total number of times the queue has been 00215 * flushed 00216 */ 00217 virtual void setStatsFlushed(int64_t cFlushed); 00218 00219 00220 // ----- Queue interface ------------------------------------------------ 00221 00222 public: 00223 /** 00224 * @{inheritDoc} 00225 */ 00226 virtual void flush(); 00227 00228 /** 00229 * @{inheritDoc} 00230 */ 00231 virtual size32_t size() const; 00232 00233 /** 00234 * @{inheritDoc} 00235 */ 00236 virtual Object::Holder remove(); 00237 00238 00239 // ----- data members --------------------------------------------------- 00240 00241 protected: 00242 /** 00243 * The AtomicLong used to maintain the FlushState. See 00244 * getFlushState() and setFlushState() helper methods. 00245 */ 00246 NativeAtomic32 m_nAtomicFlushState; 00247 00248 /** 00249 * The queue size at which to auto-flush the queue during an add 00250 * operation. If the BatchSize is greater then one, the caller must 00251 * externally call flush() when it has finished adding elements in 00252 * order to ensure that they may be processed by any waiting consumer 00253 * thread. 00254 */ 00255 int32_t m_iBatchSize; 00256 00257 /** 00258 * A counter for maintaining the size of the queue. 00259 */ 00260 NativeAtomic32 m_nElementCounter; 00261 00262 /** 00263 * The monitor on which notifications related to a queue addition 00264 * will be performed. The default value is the Queue itself. The 00265 * Notifier should not be changed while the queue is in use. If the 00266 * Notifier is null then notification will be disabled. 00267 */ 00268 FinalHandle<Object> f_hNotifier; 00269 00270 /** 00271 * The total number of times the queue transitioned to the empty 00272 * state. 00273 */ 00274 int64_t m_lStatsEmptied; 00275 00276 /** 00277 * The total number of times the queue has been flushed. 00278 */ 00279 int64_t m_lStatusFlushed; 00280 00281 /** 00282 * Indicate whether the notifier references itself 00283 */ 00284 bool m_fSelfNotifier; 00285 }; 00286 00287 COH_CLOSE_NAMESPACE2 00288 00289 #endif // COH_ABSTRACT_CONCURRENT_QUEUE