00001 /* 00002 * Copyright (c) 2000, 2020, Oracle and/or its affiliates. 00003 * 00004 * Licensed under the Universal Permissive License v 1.0 as shown at 00005 * http://oss.oracle.com/licenses/upl. 00006 */ 00007 #ifndef COH_ABSTRACT_BUNDLER_HPP 00008 #define COH_ABSTRACT_BUNDLER_HPP 00009 00010 #include "coherence/lang.ns" 00011 00012 #include "coherence/net/NamedCache.hpp" 00013 #include "coherence/util/ArrayList.hpp" 00014 #include "coherence/util/AtomicCounter.hpp" 00015 #include "coherence/util/List.hpp" 00016 00017 COH_OPEN_NAMESPACE3(coherence,net,cache) 00018 00019 using coherence::net::NamedCache; 00020 using coherence::util::ArrayList; 00021 using coherence::util::AtomicCounter; 00022 using coherence::util::List; 00023 00024 /** 00025 * An abstract base for processors that implement bundling strategy. 00026 * 00027 * Assume that we receive a continuous and concurrent stream of individual 00028 * operations on multiple threads in parallel. Let's also assume those individual 00029 * operations have relatively high latency (network or database-related) and 00030 * there are functionally analogous [bulk] operations that take a collection of 00031 * arguments instead of a single one without causing the latency to grow 00032 * linearly, as a function of the collection size. Examples of operations and 00033 * topologies that satisfy these assumptions are: 00034 * <ul> 00035 * <li> get() and getAll() methods for the NamedCache API for the 00036 * partitioned cache service topology; 00037 * <li> put() and putAll() methods for the NamedCache API for the 00038 * partitioned cache service topology; 00039 * </ul> 00040 * 00041 * Under these assumptions, it's quite clear that the bundler could achieve a 00042 * better utilization of system resources and better throughput if slightly 00043 * delays the individual execution requests with a purpose of "bundling" them 00044 * together and passing int32_to a corresponding bulk operation. Additionally, 00045 * the "bundled" request should be triggered if a bundle reaches a "preferred 00046 * bundle size" threshold, eliminating a need to wait till a bundle timeout is 00047 * reached. 00048 * 00049 * Note: we assume that all bundle-able operations are idempotent and could be 00050 * repeated if un-bundling is necessary due to a bundled operation failure. 00051 * 00052 * @author gg 2007.01.28 00053 * @author lh 2012.06.05 00054 * @since Coherence 12.1.2 00055 */ 00056 class COH_EXPORT AbstractBundler 00057 : public abstract_spec<AbstractBundler> 00058 { 00059 // ----- handle definitions (needed for nested classes) ----------------- 00060 00061 public: 00062 typedef this_spec::Handle Handle; 00063 typedef this_spec::View View; 00064 typedef this_spec::Holder Holder; 00065 00066 // ----- constructors --------------------------------------------------- 00067 00068 /** 00069 * Construct the bundler. By default, the timeout delay value is set to 00070 * one millisecond and the auto-adjustment feature is turned on. 00071 */ 00072 protected: 00073 AbstractBundler(); 00074 00075 private: 00076 /** 00077 * Blocked copy constructor. 00078 */ 00079 AbstractBundler(const AbstractBundler&); 00080 00081 // ----- property accessors --------------------------------------------- 00082 00083 public: 00084 /** 00085 * Obtain the bundle size threshold value. 00086 * 00087 * @return the bundle size threshold value expressed in the same units 00088 * as the value returned by the {@link Bundle::getBundleSize()} 00089 * method 00090 */ 00091 virtual int32_t getSizeThreshold() const; 00092 00093 /** 00094 * Specify the bundle size threshold value. 00095 * 00096 * @param cSize the bundle size threshold value; must be positive 00097 * value expressed in the same units as the value returned 00098 * by the {@link Bundle::getBundleSize()} method 00099 */ 00100 virtual void setSizeThreshold(int32_t cSize); 00101 00102 /** 00103 * Obtains the minimum number of threads that will trigger the bundler 00104 * to switch from a pass through to a bundled mode. 00105 * 00106 * @return a the number of threads threshold 00107 */ 00108 virtual int32_t getThreadThreshold() const; 00109 00110 /** 00111 * Specify the minimum number of threads that will trigger the bundler to 00112 * switch from a pass through to a bundled mode. 00113 * 00114 * @param cThreads the number of threads threshold 00115 */ 00116 virtual void setThreadThreshold(int32_t cThreads); 00117 00118 /** 00119 * Obtain the timeout delay value. 00120 * 00121 * @return the timeout delay value in milliseconds 00122 */ 00123 virtual int64_t getDelayMillis() const; 00124 00125 /** 00126 * Specify the timeout delay value. 00127 * 00128 * @param lDelay the timeout delay value in milliseconds 00129 */ 00130 virtual void setDelayMillis(int64_t lDelay); 00131 00132 /** 00133 * Check whether or not auto-adjustment is allowed. 00134 * 00135 * @return true if auto-adjustment is allowed 00136 */ 00137 virtual bool isAllowAutoAdjust() const; 00138 00139 /** 00140 * Specify whether or not auto-adjustment is allowed. 00141 * 00142 * @param fAutoAdjust true if auto-adjustment should be allowed; 00143 * false otherwise 00144 */ 00145 virtual void setAllowAutoAdjust(bool fAutoAdjust); 00146 00147 // ----- statistics ------------------------------------------------------ 00148 00149 protected: 00150 /** 00151 * Update the statistics for this Bundle. 00152 */ 00153 virtual void updateStatistics(); 00154 00155 public: 00156 /** 00157 * Reset this Bundler statistics. 00158 */ 00159 virtual void resetStatistics(); 00160 00161 /** 00162 * Adjust this Bundler's parameters according to the available 00163 * statistical information. 00164 */ 00165 virtual void adjust(); 00166 00167 // ----- Object interface ----------------------------------- 00168 00169 public: 00170 /** 00171 * {@inheritDoc} 00172 */ 00173 virtual TypedHandle<const String> toString() const; 00174 00175 // ----- inner class: Bundle -------------------------------------------- 00176 00177 /** 00178 * Bundle represents a unit of optimized execution. 00179 */ 00180 protected: 00181 class COH_EXPORT Bundle 00182 : public abstract_spec<Bundle> 00183 { 00184 friend class factory<Bundle>; 00185 friend class AbstractBundler; 00186 00187 // ----- constructors --------------------------------------------------- 00188 00189 /** 00190 * Default constructor. 00191 * 00192 * @param hBundler the AbstructBundler 00193 */ 00194 protected: 00195 Bundle(AbstractBundler::Handle hBundler); 00196 00197 // ----- accessors ------------------------------------------------- 00198 00199 public: 00200 /** 00201 * Check whether or not this bundle is open for adding request elements. 00202 * 00203 * @return true if this Bundle is still open 00204 */ 00205 virtual bool isOpen() const; 00206 00207 /** 00208 * Return the Bundler. 00209 * 00210 * @return the Bundler 00211 */ 00212 virtual AbstractBundler::Handle getBundler(); 00213 00214 protected: 00215 /** 00216 * Check whether or not this bundle is in the "pending" state - 00217 * awaiting for the execution results. 00218 * 00219 * @return true if this Bundle is in the "pending" state 00220 */ 00221 virtual bool isPending() const; 00222 00223 /** 00224 * Check whether or not this bundle is in the "processed" state - 00225 * ready to return the result of execution back to the client. 00226 * 00227 * @return true if this Bundle is in the "processed" state 00228 */ 00229 virtual bool isProcessed() const; 00230 00231 /** 00232 * Check whether or not this bundle is in the "exception" state - 00233 * bundled execution threw an exception and requests have to be 00234 * un-bundled. 00235 * 00236 * @return true if this Bundle is in the "exception" state 00237 */ 00238 virtual bool isException() const; 00239 00240 /** 00241 * Change the status of this Bundle. 00242 * 00243 * @param iStatus the new status value 00244 */ 00245 virtual void setStatus(int32_t iStatus); 00246 00247 /** 00248 * Obtain this bundle size. The return value should be expressed in the 00249 * same units as the value returned by the 00250 * {@link AbstractBundler::getSizeThreshold getSizeThreshold} method. 00251 * 00252 * @return the bundle size 00253 */ 00254 virtual int32_t getBundleSize() const; 00255 00256 /** 00257 * Check whether or not this is a "master" Bundle. 00258 * 00259 * @return true if this Bundle is a designated "master" Bundle 00260 */ 00261 virtual bool isMaster() const; 00262 00263 /** 00264 * Designate this Bundle as a "master" bundle. 00265 */ 00266 virtual void setMaster(); 00267 00268 // ----- processing and subclassing support -------------------------- 00269 00270 public: 00271 /** 00272 * Wait until results of bundled requests are retrieved. 00273 * 00274 * Note that calls to this method must be externally synchronized. 00275 * 00276 * @param fFirst true if this is the first thread entering the bundle 00277 * 00278 * @return true if this thread is supposed to perform an actual bundled 00279 * operation (burst); false otherwise 00280 */ 00281 virtual bool waitForResults(bool fFirst); 00282 00283 protected: 00284 /** 00285 * Obtain results of the bundled requests. This method should be 00286 * implemented by concrete Bundle implementations using the most 00287 * efficient mechanism. 00288 */ 00289 virtual void ensureResults() = 0; 00290 00291 /** 00292 * Obtain results of the bundled requests or ensure that the results 00293 * have already been retrieved. 00294 * 00295 * @param fBurst specifies whether or not the actual results have to be 00296 * fetched on this thread; this parameter will be true 00297 * for one and only one thread per bundle 00298 * 00299 * @return true if the bundling has succeeded; false if the un-bundling 00300 * has to be performed as a result of a failure 00301 */ 00302 virtual bool ensureResults(bool fBurst); 00303 00304 /** 00305 * Release all bundle resources associated with the current thread. 00306 * 00307 * @return true if all entered threads have released 00308 */ 00309 virtual bool releaseThread(); 00310 00311 // ----- statistics and debugging ----------------------------------- 00312 00313 public: 00314 /** 00315 * Reset statistics for this Bundle. 00316 */ 00317 virtual void resetStatistics(); 00318 00319 // ----- Object interface ----------------------------------- 00320 00321 protected: 00322 /** 00323 * {@inheritDoc} 00324 */ 00325 virtual TypedHandle<const String> toString() const; 00326 00327 protected: 00328 /** 00329 * Return a human readable name for the specified status value. 00330 * 00331 * @param iStatus the status value to format 00332 * 00333 * @return a human readable status name 00334 */ 00335 virtual String::View formatStatusName(int32_t iStatus) const; 00336 00337 // ----- data fields and constants --------------------------------- 00338 00339 public: 00340 /** 00341 * This Bundle accepting additional items. 00342 */ 00343 static const int32_t status_open = 0; 00344 00345 /** 00346 * This Bundle is closed for accepting additional items and awaiting 00347 * for the execution results. 00348 */ 00349 static const int32_t status_pending = 1; 00350 00351 /** 00352 * This Bundle is in process of returning the result of execution 00353 * back to the client. 00354 */ 00355 static const int32_t status_processed = 2; 00356 00357 /** 00358 * Attempt to bundle encountered and exception; the execution has to be 00359 * de-optimized and performed by individual threads. 00360 */ 00361 static const int32_t status_exception = 3; 00362 00363 private: 00364 /** 00365 * The bundler the operations are performed on. 00366 */ 00367 FinalHandle<AbstractBundler> f_hBundler; 00368 00369 /** 00370 * This Bundle status. 00371 */ 00372 Volatile<int32_t> m_iStatus; 00373 00374 /** 00375 * A count of threads that are using this Bundle. 00376 */ 00377 int32_t m_cThreads; 00378 00379 /** 00380 * A flag that differentiates the "master" bundle which is responsible 00381 * for all auto-adjustments. It's set to "true" for one and only one 00382 * Bundle object. 00383 */ 00384 bool m_fMaster; 00385 00386 // stat fields intentionally have the "package private" access to 00387 // prevent generation of synthetic access methods 00388 00389 protected: 00390 /** 00391 * Statistics: the total number of times this Bundle has been used for 00392 * bundled request processing. 00393 */ 00394 Volatile<int64_t> m_cTotalBundles; 00395 00396 /** 00397 * Statistics: the total size of individual requests processed by this 00398 * Bundle expressed in the same units as values returned by the 00399 * {@link Bundle::getBundleSize()} method. 00400 */ 00401 Volatile<int64_t> m_cTotalSize; 00402 00403 /** 00404 * Statistics: a timestamp of the first thread entering the bundle. 00405 */ 00406 int64_t m_ldtStart; 00407 00408 /** 00409 * Statistics: a total time duration this Bundle has spent in bundled 00410 * request processing (burst). 00411 */ 00412 Volatile<int64_t> m_cTotalBurstDuration; 00413 00414 /** 00415 * Statistics: a total time duration this Bundle has spent waiting 00416 * for bundle to be ready for processing. 00417 */ 00418 Volatile<int64_t> m_cTotalWaitDuration; 00419 }; 00420 00421 // ----- inner class: Statistics ------------------------------------------- 00422 00423 /** 00424 * Statistics class contains the latest bundler statistics. 00425 */ 00426 protected: 00427 class COH_EXPORT Statistics 00428 : public class_spec<Statistics> 00429 { 00430 friend class factory<Statistics>; 00431 friend class AbstractBundler; 00432 00433 public: 00434 /** 00435 * Reset the statistics. 00436 */ 00437 virtual void reset(); 00438 00439 // ----- Object interface ----------------------------------- 00440 00441 public: 00442 /** 00443 * {@inheritDoc} 00444 */ 00445 virtual TypedHandle<const String> toString() const; 00446 00447 // ----- running averages ------------------------------------------ 00448 00449 protected: 00450 /** 00451 * An average time for bundled request processing (burst). 00452 */ 00453 int32_t m_cAverageBurstDuration; 00454 00455 /** 00456 * An average bundle size for this Bundler. 00457 */ 00458 int32_t m_cAverageBundleSize; 00459 00460 /** 00461 * An average thread waiting time caused by under-filled bundle. The 00462 * wait time includes the time spend in the bundled request processing. 00463 */ 00464 int32_t m_cAverageThreadWaitDuration; 00465 00466 /** 00467 * An average bundled request throughput in size units per millisecond 00468 * (total bundle size over total processing time) 00469 */ 00470 int32_t m_nAverageThroughput; 00471 00472 // ----- snapshots -------------------------------------------------- 00473 00474 /** 00475 * Snapshot for a total number of processed bundled. 00476 */ 00477 int64_t m_cBundleCountSnapshot; 00478 00479 /** 00480 * Snapshot for a total size of processed bundled. 00481 */ 00482 int64_t m_cBundleSizeSnapshot; 00483 00484 /** 00485 * Snapshot for a burst duration. 00486 */ 00487 int64_t m_cBurstDurationSnapshot; 00488 00489 /** 00490 * Snapshot for a combined thread waiting time. 00491 */ 00492 int64_t m_cThreadWaitSnapshot; 00493 }; 00494 00495 // ----- sublcassing support -------------------------------------------- 00496 00497 protected: 00498 /** 00499 * Initialize the bundler. 00500 * 00501 * @param hBundle specifies the bundle for this bundler 00502 */ 00503 virtual void init(Bundle::Handle hBundle); 00504 00505 /** 00506 * Retrieve any Bundle that is currently in the open state. This method 00507 * does not assume any external synchronization and as a result, a 00508 * caller must double check the returned bundle open state (after 00509 * synchronizing on it). 00510 * 00511 * @return an open Bundle 00512 */ 00513 virtual Bundle::Handle getOpenBundle(); 00514 00515 /** 00516 * Instantiate a new Bundle object. 00517 * 00518 * @return a new Bundle object 00519 */ 00520 virtual Bundle::Handle instantiateBundle() = 0; 00521 00522 // ----- constants and data fields --------------------------------------- 00523 00524 public: 00525 /** 00526 * Frequency of the adjustment attempts. This number represents a number of 00527 * iterations of the master bundle usage after which an adjustment attempt 00528 * will be performed. 00529 */ 00530 static const int32_t adjustment_frequency = 128; 00531 00532 protected: 00533 /** 00534 * Indicate whether this is a first time adjustment. 00535 */ 00536 bool m_fFirstAdjustment; 00537 00538 /** 00539 * The previous bundle size threshold value. 00540 */ 00541 double m_dPreviousSizeThreshold; 00542 00543 /** 00544 * A pool of Bundle objects. Note that this list never shrinks. 00545 */ 00546 FinalHandle<List> f_hListBundle; 00547 00548 /** 00549 * A counter for the total number of threads that have started any bundle 00550 * related execution. This counter is used by subclasses to reduce an impact 00551 * of bundled execution for lightly loaded environments. 00552 */ 00553 FinalHandle<AtomicCounter> f_hCountThreads; 00554 00555 private: 00556 /** 00557 * The bundle size threshold. We use double for this value to allow for 00558 * fine-tuning of the auto-adjust algorithm. 00559 * 00560 * @see adjust 00561 */ 00562 double m_dSizeThreshold; 00563 00564 /** 00565 * The minimum number of threads that should trigger the bundler to switch 00566 * from a pass through mode to a bundled mode. 00567 */ 00568 int32_t m_cThreadThreshold; 00569 00570 /** 00571 * Specifies whether or not auto-adjustment is on. Default value is "true". 00572 */ 00573 bool m_fAllowAuto; 00574 00575 /** 00576 * The delay timeout in milliseconds. Default value is one millisecond. 00577 */ 00578 int64_t m_lDelayMillis; 00579 00580 /** 00581 * Last active (open) bundle position. 00582 */ 00583 Volatile<int32_t> m_iActiveBundle; 00584 00585 /** 00586 * An instance of the Statistics object containing the latest statistics. 00587 */ 00588 FinalHandle<Statistics> f_hStats; 00589 }; 00590 00591 COH_CLOSE_NAMESPACE3 00592 00593 #endif // COH_ABSTRACT_BUNDLER_HPP