00001 /* 00002 * AbstractConcurrentQueue.hpp 00003 * 00004 * Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved. 00005 * 00006 * Oracle is a registered trademarks of Oracle Corporation and/or its 00007 * affiliates. 00008 * 00009 * This software is the confidential and proprietary information of Oracle 00010 * Corporation. You shall not disclose such confidential and proprietary 00011 * information and shall use it only in accordance with the terms of the 00012 * license agreement you entered into with Oracle. 00013 * 00014 * This notice may not be removed or altered. 00015 */ 00016 #ifndef COH_ABSTRACT_CONCURRENT_QUEUE 00017 #define COH_ABSTRACT_CONCURRENT_QUEUE 00018 00019 #include "coherence/lang.ns" 00020 00021 #include "coherence/native/NativeAtomic32.hpp" 00022 #include "coherence/util/Queue.hpp" 00023 00024 COH_OPEN_NAMESPACE2(coherence,util) 00025 00026 using coherence::native::NativeAtomic32; 00027 00028 /** 00029 * The ConcurrentQueue provides a means to efficiently (and in a thread-safe 00030 * manner) queue elements with minimal contention. 00031 * 00032 * Note: The ConcurrentQueue does not support null entries. 00033 * 00034 * @author nsa 2008.01.19 00035 */ 00036 class COH_EXPORT AbstractConcurrentQueue 00037 : public abstract_spec<AbstractConcurrentQueue, 00038 extends<Object>, 00039 implements<Queue> > 00040 { 00041 // ----- enums ---------------------------------------------------------- 00042 00043 public: 00044 /** 00045 * The FlushState values are used to indicate the state of the Queue 00046 * with regards to flushing: 00047 * 00048 * <ul> 00049 * <li> 00050 * flush_pending: Indicates that a flush is pending. At 00051 * some point the queue will flush itself 00052 * automatically or the consumer will explicitly 00053 * flush the queue. 00054 * </li> 00055 * <li> 00056 * flush_auto: Indicates that no flush is pending as the queue 00057 * has been auto flushed. This state will not be 00058 * reset to flush_pending until the queue has been 00059 * cleared and the producer has added a new 00060 * element. 00061 * </li> 00062 * <li> 00063 * flush_explicit: Indicates that no flush is pending as the queue 00064 * has been explicitly flushed. This state will 00065 * not be reset to flush_pending until the queue 00066 * has been cleared and the producer has added a 00067 * new element, or if the producer calls flush 00068 * multiple times before the queue has 00069 * been cleared. 00070 * </li> 00071 * </ul> 00072 */ 00073 typedef enum 00074 { 00075 flush_pending = 0, 00076 flush_auto = 1, 00077 flush_explicit = 2 00078 } FlushState; 00079 00080 00081 // ----- constructors --------------------------------------------------- 00082 00083 protected: 00084 /** 00085 * @internal 00086 */ 00087 AbstractConcurrentQueue(); 00088 00089 00090 // ----- AbstractConcurrentQueue interface ------------------------------ 00091 00092 public: 00093 /** 00094 * Return the current flush state. 00095 */ 00096 virtual FlushState getFlushState() const; 00097 00098 /** 00099 * Return whether a flush is pending or not. 00100 * 00101 * @return true if a flush is pending 00102 */ 00103 virtual bool isFlushPending() const; 00104 00105 /** 00106 * Set the batch size of the queue. 00107 * 00108 * @param cBatch the batch size to set 00109 */ 00110 virtual void setBatchSize(int32_t cBatch); 00111 00112 /** 00113 * Return the batch size of the queue. 00114 * 00115 * @return the batch size of the queue 00116 */ 00117 virtual int32_t getBatchSize() const; 00118 00119 /** 00120 * Return the object used for notifications on this queue. 00121 * 00122 * @return the object used for notifications on this queue. 00123 */ 00124 virtual Object::Handle getNotifier(); 00125 00126 /** 00127 * Set the object used for notifications on this queue. 00128 * 00129 * @param hNotifier the object used for notifications on this queue 00130 */ 00131 virtual void setNotifier(Object::Handle hNotifier); 00132 00133 /** 00134 * Wait for the queue to contain at least one entry. 00135 * 00136 * Note: By the time the method returns the entry may have already 00137 * been removed by another thread. 00138 * 00139 * @param cMillis the number of milliseconds to wait before returing 00140 * without having been notified, or 0 to wait until 00141 * notified 00142 */ 00143 virtual void waitForEntry(int64_t cMillis); 00144 00145 /** 00146 * Return the total number of times the queue has been emptied. 00147 * 00148 * @return the total number of times the queue has been emptied. 00149 */ 00150 virtual int64_t getStatsEmptied() const; 00151 00152 /** 00153 * Return the total number of times the queue has been flushed. 00154 * 00155 * @return the total number of times the queue has been flushed 00156 */ 00157 virtual int64_t getStatsFlushed() const; 00158 00159 00160 // ----- internal helpers ----------------------------------------------- 00161 00162 protected: 00163 /** 00164 * Check whether or not the flush (notify) is necessary. This method 00165 * is always called when a new item is added to the queue. 00166 * 00167 * @param cElements the number of elements in the queue after the 00168 * addition 00169 */ 00170 virtual void checkFlush(int32_t cElements); 00171 00172 /** 00173 * Flush the queue. 00174 * 00175 * @param fAuto iff the flush was invoked automatically based on the 00176 * notification batch size 00177 * 00178 */ 00179 virtual void flush(bool fAuto); 00180 00181 /** 00182 * Event called each time an element is added to the queue. 00183 */ 00184 virtual void onAddElement(); 00185 00186 /** 00187 * Event called when the queue becomes empty. 00188 */ 00189 virtual void onEmpty(); 00190 00191 /** 00192 * Set the flush state and return the previous state. 00193 * 00194 * @param nState the state to set 00195 * 00196 * @return the previous flush state 00197 */ 00198 virtual FlushState updateFlushState(FlushState nState); 00199 00200 /** 00201 * Set the flush state iff the assumed state is correct, return the 00202 * previous flushState 00203 * 00204 * @param nStateAssumed the assumed current value of the flush state 00205 * @param nStateNew the new flush state value 00206 * 00207 * @return FlushState the old flush state value 00208 */ 00209 virtual FlushState updateFlushStateConditionally(FlushState nStateAssumed, 00210 FlushState nStateNew); 00211 00212 /** 00213 * Set the total number of times the queue has been emptied. 00214 * 00215 * @param cEmpties the total number of times the queue has been 00216 * flushed. 00217 */ 00218 virtual void setStatsEmptied(int64_t cEmpties); 00219 00220 /** 00221 * Set the total number of times the queue has been flushed. 00222 * 00223 * @param cFlushed the total number of times the queue has been 00224 * flushed 00225 */ 00226 virtual void setStatsFlushed(int64_t cFlushed); 00227 00228 00229 // ----- Queue interface ------------------------------------------------ 00230 00231 public: 00232 /** 00233 * @{inheritDoc} 00234 */ 00235 virtual void flush(); 00236 00237 /** 00238 * @{inheritDoc} 00239 */ 00240 virtual size32_t size() const; 00241 00242 /** 00243 * @{inheritDoc} 00244 */ 00245 virtual Object::Holder remove(); 00246 00247 00248 // ----- data members --------------------------------------------------- 00249 00250 protected: 00251 /** 00252 * The AtomicLong used to maintain the FlushState. See 00253 * getFlushState() and setFlushState() helper methods. 00254 */ 00255 NativeAtomic32 m_nAtomicFlushState; 00256 00257 /** 00258 * The queue size at which to auto-flush the queue during an add 00259 * operation. If the BatchSize is greater then one, the caller must 00260 * externally call flush() when it has finished adding elements in 00261 * order to ensure that they may be processed by any waiting consumer 00262 * thread. 00263 */ 00264 int32_t m_iBatchSize; 00265 00266 /** 00267 * A counter for maintaining the size of the queue. 00268 */ 00269 NativeAtomic32 m_nElementCounter; 00270 00271 /** 00272 * The monitor on which notifications related to a queue addition 00273 * will be performed. The default value is the Queue itself. The 00274 * Notifier should not be changed while the queue is in use. If the 00275 * Notifier is null then notification will be disabled. 00276 */ 00277 FinalHandle<Object> f_hNotifier; 00278 00279 /** 00280 * The total number of times the queue transitioned to the empty 00281 * state. 00282 */ 00283 int64_t m_lStatsEmptied; 00284 00285 /** 00286 * The total number of times the queue has been flushed. 00287 */ 00288 int64_t m_lStatusFlushed; 00289 00290 /** 00291 * Indicate whether the notifier references itself 00292 */ 00293 bool m_fSelfNotifier; 00294 }; 00295 00296 COH_CLOSE_NAMESPACE2 00297 00298 #endif // COH_ABSTRACT_CONCURRENT_QUEUE