00001 /* 00002 * AbstractBundler.hpp 00003 * 00004 * Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved. 00005 * 00006 * Oracle is a registered trademarks of Oracle Corporation and/or its 00007 * affiliates. 00008 * 00009 * This software is the confidential and proprietary information of Oracle 00010 * Corporation. You shall not disclose such confidential and proprietary 00011 * information and shall use it only in accordance with the terms of the 00012 * license agreement you entered into with Oracle. 00013 * 00014 * This notice may not be removed or altered. 00015 */ 00016 #ifndef COH_ABSTRACT_BUNDLER_HPP 00017 #define COH_ABSTRACT_BUNDLER_HPP 00018 00019 #include "coherence/lang.ns" 00020 00021 #include "coherence/net/NamedCache.hpp" 00022 #include "coherence/util/ArrayList.hpp" 00023 #include "coherence/util/AtomicCounter.hpp" 00024 #include "coherence/util/List.hpp" 00025 00026 COH_OPEN_NAMESPACE3(coherence,net,cache) 00027 00028 using coherence::net::NamedCache; 00029 using coherence::util::ArrayList; 00030 using coherence::util::AtomicCounter; 00031 using coherence::util::List; 00032 00033 /** 00034 * An abstract base for processors that implement bundling strategy. 00035 * 00036 * Assume that we receive a continuous and concurrent stream of individual 00037 * operations on multiple threads in parallel. Let's also assume those individual 00038 * operations have relatively high latency (network or database-related) and 00039 * there are functionally analogous [bulk] operations that take a collection of 00040 * arguments instead of a single one without causing the latency to grow 00041 * linearly, as a function of the collection size. Examples of operations and 00042 * topologies that satisfy these assumptions are: 00043 * <ul> 00044 * <li> get() and getAll() methods for the NamedCache API for the 00045 * partitioned cache service topology; 00046 * <li> put() and putAll() methods for the NamedCache API for the 00047 * partitioned cache service topology; 00048 * </ul> 00049 * 00050 * Under these assumptions, it's quite clear that the bundler could achieve a 00051 * better utilization of system resources and better throughput if slightly 00052 * delays the individual execution requests with a purpose of "bundling" them 00053 * together and passing int32_to a corresponding bulk operation. Additionally, 00054 * the "bundled" request should be triggered if a bundle reaches a "preferred 00055 * bundle size" threshold, eliminating a need to wait till a bundle timeout is 00056 * reached. 00057 * 00058 * Note: we assume that all bundle-able operations are idempotent and could be 00059 * repeated if un-bundling is necessary due to a bundled operation failure. 00060 * 00061 * @author gg 2007.01.28 00062 * @author lh 2012.06.05 00063 * @since Coherence 12.1.2 00064 */ 00065 class COH_EXPORT AbstractBundler 00066 : public abstract_spec<AbstractBundler> 00067 { 00068 // ----- handle definitions (needed for nested classes) ----------------- 00069 00070 public: 00071 typedef this_spec::Handle Handle; 00072 typedef this_spec::View View; 00073 typedef this_spec::Holder Holder; 00074 00075 // ----- constructors --------------------------------------------------- 00076 00077 /** 00078 * Construct the bundler. By default, the timeout delay value is set to 00079 * one millisecond and the auto-adjustment feature is turned on. 00080 */ 00081 protected: 00082 AbstractBundler(); 00083 00084 private: 00085 /** 00086 * Blocked copy constructor. 00087 */ 00088 AbstractBundler(const AbstractBundler&); 00089 00090 // ----- property accessors --------------------------------------------- 00091 00092 public: 00093 /** 00094 * Obtain the bundle size threshold value. 00095 * 00096 * @return the bundle size threshold value expressed in the same units 00097 * as the value returned by the {@link Bundle::getBundleSize()} 00098 * method 00099 */ 00100 virtual int32_t getSizeThreshold() const; 00101 00102 /** 00103 * Specify the bundle size threshold value. 00104 * 00105 * @param cSize the bundle size threshold value; must be positive 00106 * value expressed in the same units as the value returned 00107 * by the {@link Bundle::getBundleSize()} method 00108 */ 00109 virtual void setSizeThreshold(int32_t cSize); 00110 00111 /** 00112 * Obtains the minimum number of threads that will trigger the bundler 00113 * to switch from a pass through to a bundled mode. 00114 * 00115 * @return a the number of threads threshold 00116 */ 00117 virtual int32_t getThreadThreshold() const; 00118 00119 /** 00120 * Specify the minimum number of threads that will trigger the bundler to 00121 * switch from a pass through to a bundled mode. 00122 * 00123 * @param cThreads the number of threads threshold 00124 */ 00125 virtual void setThreadThreshold(int32_t cThreads); 00126 00127 /** 00128 * Obtain the timeout delay value. 00129 * 00130 * @return the timeout delay value in milliseconds 00131 */ 00132 virtual int64_t getDelayMillis() const; 00133 00134 /** 00135 * Specify the timeout delay value. 00136 * 00137 * @param lDelay the timeout delay value in milliseconds 00138 */ 00139 virtual void setDelayMillis(int64_t lDelay); 00140 00141 /** 00142 * Check whether or not auto-adjustment is allowed. 00143 * 00144 * @return true if auto-adjustment is allowed 00145 */ 00146 virtual bool isAllowAutoAdjust() const; 00147 00148 /** 00149 * Specify whether or not auto-adjustment is allowed. 00150 * 00151 * @param fAutoAdjust true if auto-adjustment should be allowed; 00152 * false otherwise 00153 */ 00154 virtual void setAllowAutoAdjust(bool fAutoAdjust); 00155 00156 // ----- statistics ------------------------------------------------------ 00157 00158 protected: 00159 /** 00160 * Update the statistics for this Bundle. 00161 */ 00162 virtual void updateStatistics(); 00163 00164 public: 00165 /** 00166 * Reset this Bundler statistics. 00167 */ 00168 virtual void resetStatistics(); 00169 00170 /** 00171 * Adjust this Bundler's parameters according to the available 00172 * statistical information. 00173 */ 00174 virtual void adjust(); 00175 00176 // ----- Object interface ----------------------------------- 00177 00178 public: 00179 /** 00180 * {@inheritDoc} 00181 */ 00182 virtual TypedHandle<const String> toString() const; 00183 00184 // ----- inner class: Bundle -------------------------------------------- 00185 00186 /** 00187 * Bundle represents a unit of optimized execution. 00188 */ 00189 protected: 00190 class COH_EXPORT Bundle 00191 : public abstract_spec<Bundle> 00192 { 00193 friend class factory<Bundle>; 00194 friend class AbstractBundler; 00195 00196 // ----- constructors --------------------------------------------------- 00197 00198 /** 00199 * Default constructor. 00200 * 00201 * @param hBundler the AbstructBundler 00202 */ 00203 protected: 00204 Bundle(AbstractBundler::Handle hBundler); 00205 00206 // ----- accessors ------------------------------------------------- 00207 00208 public: 00209 /** 00210 * Check whether or not this bundle is open for adding request elements. 00211 * 00212 * @return true if this Bundle is still open 00213 */ 00214 virtual bool isOpen() const; 00215 00216 /** 00217 * Return the Bundler. 00218 * 00219 * @return the Bundler 00220 */ 00221 virtual AbstractBundler::Handle getBundler(); 00222 00223 protected: 00224 /** 00225 * Check whether or not this bundle is in the "pending" state - 00226 * awaiting for the execution results. 00227 * 00228 * @return true if this Bundle is in the "pending" state 00229 */ 00230 virtual bool isPending() const; 00231 00232 /** 00233 * Check whether or not this bundle is in the "processed" state - 00234 * ready to return the result of execution back to the client. 00235 * 00236 * @return true if this Bundle is in the "processed" state 00237 */ 00238 virtual bool isProcessed() const; 00239 00240 /** 00241 * Check whether or not this bundle is in the "exception" state - 00242 * bundled execution threw an exception and requests have to be 00243 * un-bundled. 00244 * 00245 * @return true if this Bundle is in the "exception" state 00246 */ 00247 virtual bool isException() const; 00248 00249 /** 00250 * Change the status of this Bundle. 00251 * 00252 * @param iStatus the new status value 00253 */ 00254 virtual void setStatus(int32_t iStatus); 00255 00256 /** 00257 * Obtain this bundle size. The return value should be expressed in the 00258 * same units as the value returned by the 00259 * {@link AbstractBundler::getSizeThreshold getSizeThreshold} method. 00260 * 00261 * @return the bundle size 00262 */ 00263 virtual int32_t getBundleSize() const; 00264 00265 /** 00266 * Check whether or not this is a "master" Bundle. 00267 * 00268 * @return true if this Bundle is a designated "master" Bundle 00269 */ 00270 virtual bool isMaster() const; 00271 00272 /** 00273 * Designate this Bundle as a "master" bundle. 00274 */ 00275 virtual void setMaster(); 00276 00277 // ----- processing and subclassing support -------------------------- 00278 00279 public: 00280 /** 00281 * Wait until results of bundled requests are retrieved. 00282 * 00283 * Note that calls to this method must be externally synchronized. 00284 * 00285 * @param fFirst true if this is the first thread entering the bundle 00286 * 00287 * @return true if this thread is supposed to perform an actual bundled 00288 * operation (burst); false otherwise 00289 */ 00290 virtual bool waitForResults(bool fFirst); 00291 00292 protected: 00293 /** 00294 * Obtain results of the bundled requests. This method should be 00295 * implemented by concrete Bundle implementations using the most 00296 * efficient mechanism. 00297 */ 00298 virtual void ensureResults() = 0; 00299 00300 /** 00301 * Obtain results of the bundled requests or ensure that the results 00302 * have already been retrieved. 00303 * 00304 * @param fBurst specifies whether or not the actual results have to be 00305 * fetched on this thread; this parameter will be true 00306 * for one and only one thread per bundle 00307 * 00308 * @return true if the bundling has succeeded; false if the un-bundling 00309 * has to be performed as a result of a failure 00310 */ 00311 virtual bool ensureResults(bool fBurst); 00312 00313 /** 00314 * Release all bundle resources associated with the current thread. 00315 * 00316 * @return true if all entered threads have released 00317 */ 00318 virtual bool releaseThread(); 00319 00320 // ----- statistics and debugging ----------------------------------- 00321 00322 public: 00323 /** 00324 * Reset statistics for this Bundle. 00325 */ 00326 virtual void resetStatistics(); 00327 00328 // ----- Object interface ----------------------------------- 00329 00330 protected: 00331 /** 00332 * {@inheritDoc} 00333 */ 00334 virtual TypedHandle<const String> toString() const; 00335 00336 protected: 00337 /** 00338 * Return a human readable name for the specified status value. 00339 * 00340 * @param iStatus the status value to format 00341 * 00342 * @return a human readable status name 00343 */ 00344 virtual String::View formatStatusName(int32_t iStatus) const; 00345 00346 // ----- data fields and constants --------------------------------- 00347 00348 public: 00349 /** 00350 * This Bundle accepting additional items. 00351 */ 00352 static const int32_t status_open = 0; 00353 00354 /** 00355 * This Bundle is closed for accepting additional items and awaiting 00356 * for the execution results. 00357 */ 00358 static const int32_t status_pending = 1; 00359 00360 /** 00361 * This Bundle is in process of returning the result of execution 00362 * back to the client. 00363 */ 00364 static const int32_t status_processed = 2; 00365 00366 /** 00367 * Attempt to bundle encountered and exception; the execution has to be 00368 * de-optimized and performed by individual threads. 00369 */ 00370 static const int32_t status_exception = 3; 00371 00372 private: 00373 /** 00374 * The bundler the operations are performed on. 00375 */ 00376 FinalHandle<AbstractBundler> f_hBundler; 00377 00378 /** 00379 * This Bundle status. 00380 */ 00381 Volatile<int32_t> m_iStatus; 00382 00383 /** 00384 * A count of threads that are using this Bundle. 00385 */ 00386 int32_t m_cThreads; 00387 00388 /** 00389 * A flag that differentiates the "master" bundle which is responsible 00390 * for all auto-adjustments. It's set to "true" for one and only one 00391 * Bundle object. 00392 */ 00393 bool m_fMaster; 00394 00395 // stat fields intentionally have the "package private" access to 00396 // prevent generation of synthetic access methods 00397 00398 protected: 00399 /** 00400 * Statistics: the total number of times this Bundle has been used for 00401 * bundled request processing. 00402 */ 00403 Volatile<int64_t> m_cTotalBundles; 00404 00405 /** 00406 * Statistics: the total size of individual requests processed by this 00407 * Bundle expressed in the same units as values returned by the 00408 * {@link Bundle::getBundleSize()} method. 00409 */ 00410 Volatile<int64_t> m_cTotalSize; 00411 00412 /** 00413 * Statistics: a timestamp of the first thread entering the bundle. 00414 */ 00415 int64_t m_ldtStart; 00416 00417 /** 00418 * Statistics: a total time duration this Bundle has spent in bundled 00419 * request processing (burst). 00420 */ 00421 Volatile<int64_t> m_cTotalBurstDuration; 00422 00423 /** 00424 * Statistics: a total time duration this Bundle has spent waiting 00425 * for bundle to be ready for processing. 00426 */ 00427 Volatile<int64_t> m_cTotalWaitDuration; 00428 }; 00429 00430 // ----- inner class: Statistics ------------------------------------------- 00431 00432 /** 00433 * Statistics class contains the latest bundler statistics. 00434 */ 00435 protected: 00436 class COH_EXPORT Statistics 00437 : public class_spec<Statistics> 00438 { 00439 friend class factory<Statistics>; 00440 friend class AbstractBundler; 00441 00442 public: 00443 /** 00444 * Reset the statistics. 00445 */ 00446 virtual void reset(); 00447 00448 // ----- Object interface ----------------------------------- 00449 00450 public: 00451 /** 00452 * {@inheritDoc} 00453 */ 00454 virtual TypedHandle<const String> toString() const; 00455 00456 // ----- running averages ------------------------------------------ 00457 00458 protected: 00459 /** 00460 * An average time for bundled request processing (burst). 00461 */ 00462 int32_t m_cAverageBurstDuration; 00463 00464 /** 00465 * An average bundle size for this Bundler. 00466 */ 00467 int32_t m_cAverageBundleSize; 00468 00469 /** 00470 * An average thread waiting time caused by under-filled bundle. The 00471 * wait time includes the time spend in the bundled request processing. 00472 */ 00473 int32_t m_cAverageThreadWaitDuration; 00474 00475 /** 00476 * An average bundled request throughput in size units per millisecond 00477 * (total bundle size over total processing time) 00478 */ 00479 int32_t m_nAverageThroughput; 00480 00481 // ----- snapshots -------------------------------------------------- 00482 00483 /** 00484 * Snapshot for a total number of processed bundled. 00485 */ 00486 int64_t m_cBundleCountSnapshot; 00487 00488 /** 00489 * Snapshot for a total size of processed bundled. 00490 */ 00491 int64_t m_cBundleSizeSnapshot; 00492 00493 /** 00494 * Snapshot for a burst duration. 00495 */ 00496 int64_t m_cBurstDurationSnapshot; 00497 00498 /** 00499 * Snapshot for a combined thread waiting time. 00500 */ 00501 int64_t m_cThreadWaitSnapshot; 00502 }; 00503 00504 // ----- sublcassing support -------------------------------------------- 00505 00506 protected: 00507 /** 00508 * Initialize the bundler. 00509 * 00510 * @param hBundle specifies the bundle for this bundler 00511 */ 00512 virtual void init(Bundle::Handle hBundle); 00513 00514 /** 00515 * Retrieve any Bundle that is currently in the open state. This method 00516 * does not assume any external synchronization and as a result, a 00517 * caller must double check the returned bundle open state (after 00518 * synchronizing on it). 00519 * 00520 * @return an open Bundle 00521 */ 00522 virtual Bundle::Handle getOpenBundle(); 00523 00524 /** 00525 * Instantiate a new Bundle object. 00526 * 00527 * @return a new Bundle object 00528 */ 00529 virtual Bundle::Handle instantiateBundle() = 0; 00530 00531 // ----- constants and data fields --------------------------------------- 00532 00533 public: 00534 /** 00535 * Frequency of the adjustment attempts. This number represents a number of 00536 * iterations of the master bundle usage after which an adjustment attempt 00537 * will be performed. 00538 */ 00539 static const int32_t adjustment_frequency = 128; 00540 00541 protected: 00542 /** 00543 * Indicate whether this is a first time adjustment. 00544 */ 00545 bool m_fFirstAdjustment; 00546 00547 /** 00548 * The previous bundle size threshold value. 00549 */ 00550 double m_dPreviousSizeThreshold; 00551 00552 /** 00553 * A pool of Bundle objects. Note that this list never shrinks. 00554 */ 00555 FinalHandle<List> f_hListBundle; 00556 00557 /** 00558 * A counter for the total number of threads that have started any bundle 00559 * related execution. This counter is used by subclasses to reduce an impact 00560 * of bundled execution for lightly loaded environments. 00561 */ 00562 FinalHandle<AtomicCounter> f_hCountThreads; 00563 00564 private: 00565 /** 00566 * The bundle size threshold. We use double for this value to allow for 00567 * fine-tuning of the auto-adjust algorithm. 00568 * 00569 * @see adjust 00570 */ 00571 double m_dSizeThreshold; 00572 00573 /** 00574 * The minimum number of threads that should trigger the bundler to switch 00575 * from a pass through mode to a bundled mode. 00576 */ 00577 int32_t m_cThreadThreshold; 00578 00579 /** 00580 * Specifies whether or not auto-adjustment is on. Default value is "true". 00581 */ 00582 bool m_fAllowAuto; 00583 00584 /** 00585 * The delay timeout in milliseconds. Default value is one millisecond. 00586 */ 00587 int64_t m_lDelayMillis; 00588 00589 /** 00590 * Last active (open) bundle position. 00591 */ 00592 Volatile<int32_t> m_iActiveBundle; 00593 00594 /** 00595 * An instance of the Statistics object containing the latest statistics. 00596 */ 00597 FinalHandle<Statistics> f_hStats; 00598 }; 00599 00600 COH_CLOSE_NAMESPACE3 00601 00602 #endif // COH_ABSTRACT_BUNDLER_HPP