C++ Client API Reference for Oracle Coherence
14c (14.1.2.0.0)

F79659-03

coherence/net/cache/AbstractBundler.hpp

00001 /*
00002  * Copyright (c) 2000, 2020, Oracle and/or its affiliates.
00003  *
00004  * Licensed under the Universal Permissive License v 1.0 as shown at
00005  * http://oss.oracle.com/licenses/upl.
00006  */
00007 #ifndef COH_ABSTRACT_BUNDLER_HPP
00008 #define COH_ABSTRACT_BUNDLER_HPP
00009 
00010 #include "coherence/lang.ns"
00011 
00012 #include "coherence/net/NamedCache.hpp"
00013 #include "coherence/util/ArrayList.hpp"
00014 #include "coherence/util/AtomicCounter.hpp"
00015 #include "coherence/util/List.hpp"
00016 
00017 COH_OPEN_NAMESPACE3(coherence,net,cache)
00018 
00019 using coherence::net::NamedCache;
00020 using coherence::util::ArrayList;
00021 using coherence::util::AtomicCounter;
00022 using coherence::util::List;
00023 
00024 /**
00025  * An abstract base for processors that implement bundling strategy.
00026  *
00027  * Assume that we receive a continuous and concurrent stream of individual
00028  * operations on multiple threads in parallel. Let's also assume those individual
00029  * operations have relatively high latency (network or database-related) and
00030  * there are functionally analogous [bulk] operations that take a collection of
00031  * arguments instead of a single one without causing the latency to grow
00032  * linearly, as a function of the collection size. Examples of operations and
00033  * topologies that satisfy these assumptions are:
00034  * <ul>
00035  *   <li> get() and getAll() methods for the NamedCache API for the
00036  *        partitioned cache service topology;
00037  *   <li> put() and putAll() methods for the NamedCache API for the
00038  *        partitioned cache service topology;
00039  * </ul>
00040  *
00041  * Under these assumptions, it's quite clear that the bundler could achieve a
00042  * better utilization of system resources and better throughput if slightly
00043  * delays the individual execution requests with a purpose of "bundling" them
00044  * together and passing int32_to a corresponding bulk operation. Additionally,
00045  * the "bundled" request should be triggered if a bundle reaches a "preferred
00046  * bundle size" threshold, eliminating a need to wait till a bundle timeout is
00047  * reached.
00048  *
00049  * Note: we assume that all bundle-able operations are idempotent and could be
00050  * repeated if un-bundling is necessary due to a bundled operation failure.
00051  *
00052  * @author gg 2007.01.28
00053  * @author lh 2012.06.05
00054  * @since Coherence 12.1.2
00055  */
00056 class COH_EXPORT AbstractBundler
00057     : public abstract_spec<AbstractBundler>
00058     {
00059     // ----- handle definitions (needed for nested classes) -----------------
00060 
00061     public:
00062         typedef this_spec::Handle Handle;
00063         typedef this_spec::View   View;
00064         typedef this_spec::Holder Holder;
00065 
00066     // ----- constructors ---------------------------------------------------
00067 
00068     /**
00069      * Construct the bundler. By default, the timeout delay value is set to
00070      * one millisecond and the auto-adjustment feature is turned on.
00071      */
00072     protected:
00073         AbstractBundler();
00074 
00075     private:
00076         /**
00077          * Blocked copy constructor.
00078          */
00079         AbstractBundler(const AbstractBundler&);
00080 
00081     // ----- property accessors ---------------------------------------------
00082 
00083     public:
00084         /**
00085          * Obtain the bundle size threshold value.
00086          *
00087          * @return the bundle size threshold value expressed in the same units
00088          *         as the value returned by the {@link Bundle::getBundleSize()}
00089          *         method
00090          */
00091         virtual int32_t getSizeThreshold() const;
00092 
00093         /**
00094          * Specify the bundle size threshold value.
00095          *
00096          * @param cSize  the bundle size threshold value; must be positive
00097          *               value expressed in the same units as the value returned
00098          *               by the {@link Bundle::getBundleSize()} method
00099          */
00100         virtual void setSizeThreshold(int32_t cSize);
00101 
00102         /**
00103          * Obtains the minimum number of threads that will trigger the bundler
00104          * to switch from a pass through to a bundled mode.
00105          *
00106          * @return a the number of threads threshold
00107          */
00108         virtual int32_t getThreadThreshold() const;
00109 
00110         /**
00111          * Specify the minimum number of threads that will trigger the bundler to
00112          * switch from a pass through to a bundled mode.
00113          *
00114          * @param cThreads  the number of threads threshold
00115          */
00116         virtual void setThreadThreshold(int32_t cThreads);
00117 
00118         /**
00119          * Obtain the timeout delay value.
00120          *
00121          * @return the timeout delay value in milliseconds
00122          */
00123         virtual int64_t getDelayMillis() const;
00124 
00125         /**
00126          * Specify the timeout delay value.
00127          *
00128          * @param lDelay  the timeout delay value in milliseconds
00129          */
00130         virtual void setDelayMillis(int64_t lDelay);
00131 
00132         /**
00133          * Check whether or not auto-adjustment is allowed.
00134          *
00135          * @return true if auto-adjustment is allowed
00136          */
00137         virtual bool isAllowAutoAdjust() const;
00138 
00139         /**
00140          * Specify whether or not auto-adjustment is allowed.
00141          *
00142          * @param fAutoAdjust  true if auto-adjustment should be allowed;
00143          *                     false otherwise
00144          */
00145         virtual void setAllowAutoAdjust(bool fAutoAdjust);
00146 
00147     // ----- statistics ------------------------------------------------------
00148 
00149     protected:
00150         /**
00151          * Update the statistics for this Bundle.
00152          */
00153         virtual void updateStatistics();
00154 
00155     public:
00156         /**
00157          * Reset this Bundler statistics.
00158          */
00159         virtual void resetStatistics();
00160 
00161         /**
00162          * Adjust this Bundler's parameters according to the available
00163          * statistical information.
00164          */
00165         virtual void adjust();
00166 
00167     // ----- Object interface -----------------------------------
00168 
00169     public:
00170         /**
00171          * {@inheritDoc}
00172          */
00173         virtual TypedHandle<const String> toString() const;
00174 
00175     // ----- inner class: Bundle --------------------------------------------
00176 
00177     /**
00178      * Bundle represents a unit of optimized execution.
00179      */
00180     protected:
00181         class COH_EXPORT Bundle
00182             : public abstract_spec<Bundle>
00183             {
00184             friend class factory<Bundle>;
00185             friend class AbstractBundler;
00186 
00187             // ----- constructors ---------------------------------------------------
00188 
00189             /**
00190              * Default constructor.
00191              *
00192              * @param hBundler  the AbstructBundler
00193              */
00194             protected:
00195                 Bundle(AbstractBundler::Handle hBundler);
00196 
00197             // ----- accessors -------------------------------------------------
00198 
00199             public:
00200                 /**
00201                  * Check whether or not this bundle is open for adding request elements.
00202                  *
00203                  * @return true if this Bundle is still open
00204                  */
00205                 virtual bool isOpen() const;
00206 
00207                 /**
00208                  * Return the Bundler.
00209                  *
00210                  * @return the Bundler
00211                  */
00212                 virtual AbstractBundler::Handle getBundler();
00213 
00214             protected:
00215                 /**
00216                  * Check whether or not this bundle is in the "pending" state -
00217                  * awaiting for the execution results.
00218                  *
00219                  * @return true if this Bundle is in the "pending" state
00220                  */
00221                 virtual bool isPending() const;
00222 
00223                 /**
00224                  * Check whether or not this bundle is in the "processed" state -
00225                  * ready to return the result of execution back to the client.
00226                  *
00227                  * @return true if this Bundle is in the "processed" state
00228                  */
00229                 virtual bool isProcessed() const;
00230 
00231                 /**
00232                  * Check whether or not this bundle is in the "exception" state -
00233                  * bundled execution threw an exception and requests have to be
00234                  * un-bundled.
00235                  *
00236                  * @return true if this Bundle is in the "exception" state
00237                  */
00238                 virtual bool isException() const;
00239 
00240                 /**
00241                  * Change the status of this Bundle.
00242                  *
00243                  * @param iStatus  the new status value
00244                  */
00245                 virtual void setStatus(int32_t iStatus);
00246 
00247                 /**
00248                  * Obtain this bundle size. The return value should be expressed in the
00249                  * same units as the value returned by the
00250                  * {@link AbstractBundler::getSizeThreshold getSizeThreshold} method.
00251                  *
00252                  * @return the bundle size
00253                  */
00254                 virtual int32_t getBundleSize() const;
00255 
00256                 /**
00257                  * Check whether or not this is a "master" Bundle.
00258                  *
00259                  * @return true if this Bundle is a designated "master" Bundle
00260                  */
00261                 virtual bool isMaster() const;
00262 
00263                 /**
00264                  * Designate this Bundle as a "master" bundle.
00265                  */
00266                 virtual void setMaster();
00267 
00268                 // ----- processing and subclassing support --------------------------
00269 
00270             public:
00271                 /**
00272                  * Wait until results of bundled requests are retrieved.
00273                  * 
00274                  * Note that calls to this method must be externally synchronized.
00275                  *
00276                  * @param fFirst  true if this is the first thread entering the bundle
00277                  *
00278                  * @return true if this thread is supposed to perform an actual bundled
00279                  *         operation (burst); false otherwise
00280                  */
00281                 virtual bool waitForResults(bool fFirst);
00282 
00283             protected:
00284                 /**
00285                  * Obtain results of the bundled requests. This method should be
00286                  * implemented by concrete Bundle implementations using the most
00287                  * efficient mechanism.
00288                  */
00289                 virtual void ensureResults() = 0;
00290 
00291                 /**
00292                  * Obtain results of the bundled requests or ensure that the results
00293                  * have already been retrieved.
00294                  *
00295                  * @param fBurst  specifies whether or not the actual results have to be
00296                  *                fetched on this thread; this parameter will be true
00297                  *                for one and only one thread per bundle
00298                  *
00299                  * @return true if the bundling has succeeded; false if the un-bundling
00300                  *         has to be performed as a result of a failure
00301                  */
00302                 virtual bool ensureResults(bool fBurst);
00303 
00304                 /**
00305                  * Release all bundle resources associated with the current thread.
00306                  *
00307                  * @return true if all entered threads have released
00308                  */
00309                 virtual bool releaseThread();
00310 
00311             // ----- statistics and debugging  -----------------------------------
00312 
00313             public:
00314                 /**
00315                  * Reset statistics for this Bundle.
00316                  */
00317                 virtual void resetStatistics();
00318 
00319             // ----- Object interface -----------------------------------
00320 
00321             protected:
00322                 /**
00323                  * {@inheritDoc}
00324                  */
00325                 virtual TypedHandle<const String> toString() const;
00326 
00327             protected:
00328                 /**
00329                  * Return a human readable name for the specified status value.
00330                  *
00331                  * @param iStatus  the status value to format
00332                  *
00333                  * @return a human readable status name
00334                  */
00335                 virtual String::View formatStatusName(int32_t iStatus) const;
00336 
00337             // ----- data fields and constants ---------------------------------
00338 
00339             public:
00340                 /**
00341                  * This Bundle accepting additional items.
00342                  */
00343                 static const int32_t status_open      = 0;
00344 
00345                 /**
00346                  * This Bundle is closed for accepting additional items and awaiting
00347                  * for the execution results.
00348                  */
00349                 static const int32_t status_pending   = 1;
00350 
00351                 /**
00352                  * This Bundle is in process of returning the result of execution
00353                  * back to the client.
00354                  */
00355                 static const int32_t status_processed = 2;
00356 
00357                 /**
00358                  * Attempt to bundle encountered and exception; the execution has to be
00359                  * de-optimized and performed by individual threads.
00360                  */
00361                 static const int32_t status_exception = 3;
00362 
00363             private:
00364                 /**
00365                  * The bundler the operations are performed on.
00366                  */
00367                 FinalHandle<AbstractBundler> f_hBundler;
00368 
00369                 /**
00370                  * This Bundle status.
00371                  */
00372                 Volatile<int32_t> m_iStatus;
00373 
00374                 /**
00375                  * A count of threads that are using this Bundle.
00376                  */
00377                 int32_t m_cThreads;
00378 
00379                 /**
00380                  * A flag that differentiates the "master" bundle which is responsible
00381                  * for all auto-adjustments. It's set to "true" for one and only one
00382                  * Bundle object.
00383                  */
00384                 bool m_fMaster;
00385 
00386                 // stat fields intentionally have the "package private" access to
00387                 // prevent generation of synthetic access methods
00388 
00389             protected:
00390                 /**
00391                  * Statistics: the total number of times this Bundle has been used for
00392                  * bundled request processing.
00393                  */
00394                 Volatile<int64_t> m_cTotalBundles;
00395 
00396                 /**
00397                  * Statistics: the total size of individual requests processed by this
00398                  * Bundle expressed in the same units as values returned by the
00399                  * {@link Bundle::getBundleSize()} method.
00400                  */
00401                 Volatile<int64_t> m_cTotalSize;
00402 
00403                 /**
00404                  * Statistics: a timestamp of the first thread entering the bundle.
00405                  */
00406                 int64_t m_ldtStart;
00407 
00408                 /**
00409                  * Statistics: a total time duration this Bundle has spent in bundled
00410                  * request processing (burst).
00411                  */
00412                 Volatile<int64_t> m_cTotalBurstDuration;
00413 
00414                 /**
00415                  * Statistics: a total time duration this Bundle has spent waiting
00416                  * for bundle to be ready for processing.
00417                  */
00418                 Volatile<int64_t> m_cTotalWaitDuration;
00419             };
00420 
00421     // ----- inner class: Statistics -------------------------------------------
00422 
00423     /**
00424      * Statistics class contains the latest bundler statistics.
00425      */
00426     protected:
00427         class COH_EXPORT Statistics
00428             : public class_spec<Statistics>
00429             {
00430             friend class factory<Statistics>;
00431             friend class AbstractBundler;
00432 
00433             public:
00434                 /**
00435                  * Reset the statistics.
00436                  */
00437                 virtual void reset();
00438 
00439             // ----- Object interface -----------------------------------
00440 
00441             public:
00442                 /**
00443                  * {@inheritDoc}
00444                  */
00445                 virtual TypedHandle<const String> toString() const;
00446 
00447             // ----- running averages ------------------------------------------
00448 
00449             protected:
00450                 /**
00451                  * An average time for bundled request processing (burst).
00452                  */
00453                 int32_t m_cAverageBurstDuration;
00454 
00455                 /**
00456                  * An average bundle size for this Bundler.
00457                  */
00458                 int32_t m_cAverageBundleSize;
00459 
00460                 /**
00461                  * An average thread waiting time caused by under-filled bundle. The
00462                  * wait time includes the time spend in the bundled request processing.
00463                  */
00464                 int32_t m_cAverageThreadWaitDuration;
00465 
00466                 /**
00467                  * An average bundled request throughput in size units per millisecond
00468                  * (total bundle size over total processing time)
00469                  */
00470                 int32_t m_nAverageThroughput;
00471 
00472                 // ----- snapshots --------------------------------------------------
00473 
00474                 /**
00475                  * Snapshot for a total number of processed bundled.
00476                  */
00477                 int64_t m_cBundleCountSnapshot;
00478 
00479                 /**
00480                  * Snapshot for a total size of processed bundled.
00481                  */
00482                 int64_t m_cBundleSizeSnapshot;
00483 
00484                 /**
00485                  * Snapshot for a burst duration.
00486                  */
00487                 int64_t m_cBurstDurationSnapshot;
00488 
00489                 /**
00490                  * Snapshot for a combined thread waiting time.
00491                  */
00492                 int64_t m_cThreadWaitSnapshot;
00493             };
00494 
00495     // ----- sublcassing support --------------------------------------------
00496 
00497     protected:
00498         /**
00499          * Initialize the bundler.
00500          *
00501          * @param hBundle              specifies the bundle for this bundler
00502          */
00503         virtual void init(Bundle::Handle hBundle);
00504 
00505         /**
00506          * Retrieve any Bundle that is currently in the open state. This method
00507          * does not assume any external synchronization and as a result, a
00508          * caller must double check the returned bundle open state (after
00509          * synchronizing on it).
00510          *
00511          * @return an open Bundle
00512          */
00513         virtual Bundle::Handle getOpenBundle();
00514 
00515         /**
00516          * Instantiate a new Bundle object.
00517          *
00518          * @return a new Bundle object
00519          */
00520         virtual Bundle::Handle instantiateBundle() = 0;
00521 
00522     // ----- constants and data fields ---------------------------------------
00523 
00524     public:
00525         /**
00526          * Frequency of the adjustment attempts. This number represents a number of
00527          * iterations of the master bundle usage after which an adjustment attempt
00528          * will be performed.
00529          */
00530         static const int32_t adjustment_frequency = 128;
00531 
00532     protected:
00533         /**
00534          * Indicate whether this is a first time adjustment.
00535          */
00536         bool m_fFirstAdjustment;
00537 
00538         /**
00539          * The previous bundle size threshold value.
00540          */
00541         double m_dPreviousSizeThreshold;
00542 
00543         /**
00544          * A pool of Bundle objects. Note that this list never shrinks.
00545          */
00546         FinalHandle<List> f_hListBundle;
00547 
00548         /**
00549          * A counter for the total number of threads that have started any bundle
00550          * related execution. This counter is used by subclasses to reduce an impact
00551          * of bundled execution for lightly loaded environments.
00552          */
00553         FinalHandle<AtomicCounter> f_hCountThreads;
00554 
00555     private:
00556         /**
00557          * The bundle size threshold. We use double for this value to allow for
00558          * fine-tuning of the auto-adjust algorithm.
00559          *
00560          * @see adjust
00561          */
00562         double m_dSizeThreshold;
00563 
00564         /**
00565          * The minimum number of threads that should trigger the bundler to switch
00566          * from a pass through mode to a bundled mode.
00567          */
00568         int32_t m_cThreadThreshold;
00569 
00570         /**
00571          * Specifies whether or not auto-adjustment is on. Default value is "true".
00572          */
00573         bool m_fAllowAuto;
00574 
00575         /**
00576          * The delay timeout in milliseconds. Default value is one millisecond.
00577          */
00578         int64_t m_lDelayMillis;
00579 
00580         /**
00581          * Last active (open) bundle position.
00582          */
00583         Volatile<int32_t> m_iActiveBundle;
00584 
00585         /**
00586          * An instance of the Statistics object containing the latest statistics.
00587          */
00588         FinalHandle<Statistics> f_hStats;
00589     };
00590 
00591 COH_CLOSE_NAMESPACE3
00592 
00593 #endif // COH_ABSTRACT_BUNDLER_HPP
Copyright © 2000, 2025, Oracle and/or its affiliates. Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.