41 Implementing Concurrency in a Distributed Environment
The Coherence Concurrent module provides distributed implementations of the
concurrency primitives from the java.util.concurrent
package such as
executors, atomics, locks, semaphores, and latches.
You can implement concurrent applications using the constructs you are already familiar with and also expand the "scope" of concurrency from a single process to potentially hundreds of processes within a Coherence cluster. You can use executors to submit tasks to be executed somewhere in the cluster; you can use locks, latches, and semaphores to synchronize execution across many cluster members; you can use atomics to implement global counters across many processes, and so on.
While these features are extremely powerful and enable you to reuse the knowledge you already have, they may have detrimental effect on scalability and/or performance. Whenever you synchronize execution through locks, latches, or semaphores, you are introducing a potential bottleneck into the architecture. Whenever you use a distributed atomic to implement a global counter, you are turning very simple operations that take mere nanoseconds locally, such as increment and decrement, into fairly expensive network calls that could take milliseconds (and potentially block even longer under heavy load).
So, use these features sparingly. In many cases, there is a better, faster, and a more scalable way to accomplish the same goal using Coherence primitives such as entry processors, aggregators, and events. These primitives are designed to perform and scale well in a distributed environment.
Note:
- To use the concurrency features, Oracle recommends using the Bootstrap API to start the Coherence cluster members. See Using the Bootstrap API.
- Coherence concurrent features do not support, and cannot be configured to use federation. Coherence Federation is asynchronous. Therefore, it would not make sense to federate data that is inherently atomic in nature.
This chapter includes the following sections:
- Using Factory Classes
Each feature (executors, atomics, locks, semaphores, and latches) is backed by one or more Coherence caches, possibly with preconfigured interceptors. All interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need. - Using Local and Remote Instances
In many cases, the factory classes allow you to get both the local and the remote instances of various constructs. For example,Locks.localLock
will give you an instance of a standardjava.util.concurrent.locks.ReentrantLock
, whileLocks.remoteLock
will return an instance of aRemoteLock
. - Using Serialization
Coherence Concurrent supports both Java serialization and POF out-of-the-box serialization, with Java serialization being the default. - Using Persistence
Coherence Concurrent supports both active and on-demand persistence, but just like in the rest of Coherence it is set toon-demand
by default. - Using the Coherence Concurrent Features
You can use the Coherence Concurrent features after declaring the features as a dependency in thepom.xml
file. - Using Executors
Coherence Concurrent provides a facility to dispatch tasks, either aRunnable
,Callable
, orTask
to a Coherence cluster for execution. - Using Atomics
Coherence Concurrent provides distributed implementations of atomic types, such asAtomicInteger
,AtomicLong
, andAtomicReference
. It also provides local implementations of the same types. - Using Locks
Coherence Concurrent provides distributed implementations ofLock
andReadWriteLock
interfaces from thejava.util.concurrent.locks
package, enabling you to implement lock-based concurrency control across cluster members when necessary. - Using Latches and Semaphores
- Using Blocking Queues
Parent topic: Performing Data Grid Operations
Using Factory Classes
Each feature (executors, atomics, locks, semaphores, and latches) is backed by one or more Coherence caches, possibly with preconfigured interceptors. All interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need.
For example, you will use factory methods within the Atomics
class
to get instances of various atomic types, Locks
to get lock instances,
Latches
and Semaphores
to get latches and
semaphores.
Parent topic: Implementing Concurrency in a Distributed Environment
Using Local and Remote Instances
In many cases, the factory classes allow you to get both the local and the
remote instances of various constructs. For example, Locks.localLock
will
give you an instance of a standard
java.util.concurrent.locks.ReentrantLock
, while Locks.remoteLock
will return an instance of a RemoteLock
.
In cases where JDK does not provide a standard interface, which is the case with
atomics, latches, and semaphores, the interface from the existing JDK class has been
extracted to create a thin wrapper around the corresponding JDK implementation. For
example, Coherence Concurrent provides a Semaphore
interface and a
LocalSemaphore
class that wraps
java.util.concurrent.Semaphore
. The same is true for
CountDownLatch
and all atomic types.
The main advantage of using factory classes to construct both the local and the remote
instances is that it allows you to name local locks the same way you name the remote
locks: calling Locks.localLock("foo")
always returns the same
Lock
instance because the Locks
class internally
caches both the local and the remote instances it creates. In the case of remote locks,
every locally cached remote lock instance is ultimately backed by a shared lock instance
somewhere in the cluster, which is used to synchronize lock state across the
processes.
Parent topic: Implementing Concurrency in a Distributed Environment
Using Serialization
Coherence Concurrent supports both Java serialization and POF out-of-the-box serialization, with Java serialization being the default.
If you want to use POF instead, you have to set the
coherence.concurrent.serializer
system property to
pof
. You should also include the
coherence-concurrent-pof-config.xml
file into your own POF
configuration file to register the built-in Coherence Concurrent types.
Parent topic: Implementing Concurrency in a Distributed Environment
Using Persistence
Coherence Concurrent supports both active and on-demand persistence, but just
like in the rest of Coherence it is set to on-demand
by
default.
To use active persistence, you should set the
coherence.concurrent.persistence.environment
system property to
default-active
, or use another persistence environment that has
active persistence enabled.
Note:
The caches that store lock and semaphore data are configured as transient, and are not persisted when you use active or on-demand persistence.Parent topic: Implementing Concurrency in a Distributed Environment
Using the Coherence Concurrent Features
You can use the Coherence Concurrent features after declaring the features as
a dependency in the pom.xml
file.
To declare, make the following entry in the pom.xml
file.
<dependency>
<groupId>${coherence.groupId}</groupId>
<artifactId>coherence-concurrent</artifactId>
<version>${coherence.version}</version>
</dependency>
Parent topic: Implementing Concurrency in a Distributed Environment
Using Executors
Runnable
, Callable
, or Task
to a Coherence cluster for execution. Executors that will run the submitted tasks are configured on each cluster member by defining one or more named executors within a cache configuration resource.
This section includes the following topics:
- Using Executors - Examples
- Advanced Orchestration
- Advanced Orchestration - Examples
- Configuring Executors
- Managing Executors
- Managing Executors Over REST
- Using CDI
Parent topic: Implementing Concurrency in a Distributed Environment
Using Executors - Examples
By default, each Coherence cluster with the coherence-concurrent
module on the classpath includes a single-threaded executor that may be used to run the dispatched tasks.
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();
Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));
result.get(); // block until completion
Fixed5
, then a reference to the executor may be obtained with::RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");
RemoteExecutor
will throw the following
exception:RejectedExecutionException
Each RemoteExecutor
instance may hold local resources that should be released when the RemoteExecutor
is no longer required. Like an ExecutorService
, a RemoteExecutor
has similar methods to shut down the executor. Calling these methods has no impact on the executors registered within the cluster.
Parent topic: Using Executors
Advanced Orchestration
While the RemoteExecutor
does provide functionality similar to the standard ExecutorService
included in the JDK, this may not be enough in the context of Coherence. A task might need to run across multiple Coherence members, produce intermediate results, and remain durable in case a cluster member running the task fails.
In such cases, task orchestration can be used. Before diving into the details of orchestration, the following concepts should be understood.
Table 41-1 Task Orchestration Interfaces
Interface | Description |
---|---|
|
Tasks are like |
|
Provides contextual information for a |
|
Defines information concerning the orchestration of a |
|
A publisher of collected |
|
A receiver of items produced by a |
|
State sharing mechanism for tasks. |
|
A mutable reduction operation that accumulates results into a mutable result container, optionally transforming the accumulated result into a final representation after all results have been processed. |
- Tasks
- Task Context
- Task Orchestration
- Task Collector and Collectable
- Task Coordinator
- Task Subscriber
Parent topic: Using Executors
Tasks
Task
implementations define a single method called execute(Context)
that performs the task, possibly yielding execution to some later point. After the method has completed execution, by returning a result or throwing an exception (but not a Yield
exception), the task is considered completed for the assigned Executor
.
A Task
may yield execution for a given time by throwing a Yield
exception. This exception type signals the execution of a Task
by an Executor
is to be suspended and resumed at some later point in time, typically by the same Executor
.
Parent topic: Advanced Orchestration
Task Context
When a Task
is executed, a Context
instance will be passed as an execution argument.
The Context
provides access to task properties allowing shared state between tasks running in multiple Java Virtual Machines.
The Context
provides details on overall execution status.
Table 41-2 Execution Status
Execution State | Method | Description |
---|---|---|
|
|
Allows a |
|
|
Allows a |
|
|
Determines if a |
Parent topic: Advanced Orchestration
Task Orchestration
Orchestrations begin by calling RemoteExecutor.orchestrate(Task)
, which will return a Task.Orchestration
instance for the given Task
. With the Task.Orchestration
, it's possible to configure the aspects of where the task will be run.
Table 41-3 Task Orchestration Methods
Method | Description |
---|---|
|
Tasks will be run, concurrently, across all Java Virtual Machines, where the named executor is defined/configured. This is the default. |
|
Tasks will be run, in sequence, across all Java Virtual Machines, where the named executor is defined/configured. |
|
Limit the task to |
|
Filtering provides an additional way to constrain where a task may be run. The predicates will be applied against metadata associated with each executor on each Java Virtual Machine. Some examples of metadata would be the member in which the executor is running, or the role of a member. Predicates may be chained to provide Boolean logic in determining an appropriate executor. |
|
Define initial state that will be available to all tasks no matter which Java Virtual Machine that task is running on. |
|
When specified, the task will be retained allowing new subscribers to be notified of the final result of a task computation after it has completed. |
|
This is the terminal of the orchestration builder returning a |
Parent topic: Advanced Orchestration
Task Collector and Collectable
The Task.Collector
passed to the orchestration will collect results from tasks and optionally transform the collected results into a final format. Collectors are best illustrated by using examples of Collectors that are available in the TaskCollector
class.
Table 41-4 Task Collector Methods
Method | Description |
---|---|
|
The count of non-null results that have been collected from the executing tasks. |
|
Collects and returns the first result provided by the executing tasks. |
|
Collects and returns the last result returned by the executing tasks. |
|
Collects and returns all non-null results as a Set. |
|
Collects and returns all non-null results as a List. |
The Task.Collectable
instance returned by calling collect
on the orchestration allows, among other things, setting the condition under which no more results will be collected or published by any registered subscribers. Calling submit()
on the Task.Collectable
will begin the orchestration of the task.
Parent topic: Advanced Orchestration
Task Coordinator
Upon calling submit()
on the orchestration Collectable
, a Task.Coordinator
is returned. Like the Task.Collectable
, the Task.Coordinator
allows for the registration of subscribers. Additionally, it provides the ability to cancel or check the completion status of the orchestration.
Parent topic: Advanced Orchestration
Task Subscriber
The Task.Subscriber
receives various events pertaining to the execution status of the orchestration.
Table 41-5 Task Subscriber Events
Method | Description |
---|---|
|
Signals the completion of the orchestration. |
|
Called when an unrecoverable error (given as the argument) has occurred. |
|
Called when the |
|
Called prior to any calls to |
Parent topic: Advanced Orchestration
Advanced Orchestration - Examples
To begin, consider the following code common to orchestration examples:
// demonstrate orchestration using the default RemoteExecutor RemoteExecutor executor = RemoteExecutor.getDefault(); // WaitingSubscriber is an implementation of the // com.oracle.coherence.concurrent.executor.Task.Subscriber interface // that has a get() method that blocks until Subscriber.onComplete() is // called and will return the results received by onNext() WaitingSubscriber subscriber = new WaitingSubscriber(); // ValueTask is an implementation of the // com.oracle.coherence.concurrent.executor.Task interface // that returns the value provided at construction time ValueTask task = new ValueTask("Hello World");
Given the previous example, the simplest example of an orchestration is:
// orchestrate the task, subscribe, and submit executor.orchestrate(task) .subscribe(subscriber) .submit(); // wait for the task to complete // if this was run on four cluster members running the default executor service, // the returned Collection will have four results Collection<String> results = subscriber.get();
Building on the previous example, assume a cluster with two storage and two proxy members. The cluster members are configured with the roles of storage
and proxy
, respectively. Let's say the task needs to run on storage
members only, then the orchestration could look like:
// orchestrate the task, filtering by a role, subscribe, and submit executor.orchestrate(task) .filter(Predicates.role("storage")) .subscribe(subscriber) .submit(); // wait for the task to complete // as there are only two storage members in this hypothetical, only two // results will be returned Collection<String> results = subscriber.get();
There are several predicates available for use in com.oracle.coherence.concurrent.executor.function.Predicates
, however, in case none apply to the target use case, simply implement the Remote.Predicate
interface.
You can customize the collection of results and how they are presented to the subscriber by using collect(Collector)
and until(Predicate)
:
// orchestrate the task, collecting the first non-null result, // subscribe, and submit executor.orchestrate(new MayReturnNullTask()) .collect(TaskCollectors.firstOf()) .until(Predicates.nonNullValue()) .subscribe(subscriber) .submit(); // wait for the task to complete // the first non-result returned will be the one provided to the // subscriber Collection<String> results = subscriber.get();
Several collectors are provided in com.oracle.coherence.concurrent.executor.TaskCollectors
, however, in case none apply to the target use case, implement the Task.Collector
interface.
Parent topic: Using Executors
Configuring Executors
Several executor types are available for configuration.
Table 41-6 Types of Executors
ExecutorService Type | Description |
---|---|
Single thread |
Creates an |
Fixed thread |
Creates an |
Cached |
Creates an |
Work stealing |
Creates a work-stealing thread pool by using the number of available processors as its target parallelism level. |
Custom |
Allows the creation of non-standard executors. |
VirtualThread |
Creates a VirtualThread-per-task ExecutorService. Requires JDK 21 or later. |
Table 41-7 Configuration Elements
Element Name | Required | Expected Type | Description |
---|---|---|---|
|
no |
N/A |
Defines a single-thread executor. |
|
no |
N/A |
Defines a fixed-thread pool executor. |
|
no |
N/A |
Defines a cached-thread-pool executor |
|
no |
N/A |
Defines a work-stealing pool executor. |
|
no |
|
Defines a custom executor. |
|
no |
N/A |
Defines a VirtualThread-per-task executor. |
|
yes |
|
Defines the logical |
|
yes |
|
Defines the thread count for a fixed-thread pool executor. |
|
no |
|
Defines the parallelism of a work-stealing-thread pool executor. If not defined, it defaults to the number of processors available on the system. |
|
no |
N/A |
Defines a |
|
yes |
|
Defines how the |
For complete details, see schema.
Configuration Examples
cache-config
root element should include
the coherence-concurrent
NamespaceHandler to recognize the
configuration
elements.<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>
Note:
Executors defined by the configuration must precede any other elements in the document. Failing to do so, will prevent the document from being validated.xml
namespace
defined for the NamespaceHandler is
c
:<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
<c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factory-->
<c:single>
<c:name>SingleTF</c:name>
<c:thread-factory>
<c:instance>
<c:class-name>my.custom.ThreadFactory</c:class-name>
</c:instance>
</c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
<c:name>Single</c:name>
<c:thread-count>5</c:thread-count>
</c:fixed>
Parent topic: Using Executors
Managing Executors
There are various ways to manage and monitor executors. You can use one of the following options:
- MBeans: See ExecutorMBean.
- Reporter: See Understanding the Executor Report.
- Coherence VisualVM plug-in: See coherence-visualvm.
- REST API: See Managing Executors Over REST.
- Distributed Tracing: See Distributed Tracing.
Parent topic: Using Executors
Managing Executors Over REST
Coherence Management over REST exposes endpoints to query and invoke actions against
ExecutorMBean
instances.
Table 41-8 REST Endpoints
Description | Method | Path | Produces |
---|---|---|---|
View all Executors |
|
|
JSON |
View all Executors with matching name |
|
|
JSON |
Reset Executor statistics by name |
|
|
JSON |
Parent topic: Using Executors
Using CDI
You can inject RemoteExecutors through CDI.
@Inject
private RemoteExecutor single; // injects a RemoteExecutor named 'single'.
@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor; // injects a RemoteExecutor named 'Fixed5'.
Parent topic: Using Executors
Using Atomics
AtomicInteger
, AtomicLong
, and
AtomicReference
. It also provides local implementations of the same
types. The local implementations are just thin wrappers around the existing
java.util.concurrent.atomic
types, which implement the same interface
as their distributed variants, to be interchangeable.
Atomics
class:// Creates a local, in-process instance of named 'AtomicInteger' with an implicit initial value of 0.
AtomicInteger localFoo = Atomics.localAtomicInteger("foo");
// Creates a remote, distributed instance of named 'AtomicInteger', distinct from the local instance 'foo',
// with an implicit initial value of '0'.
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
// Creates a remote, distributed instance of named 'AtomicLong', with an initial value of '5'.
AtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);
Note:
TheAtomicInteger
and AtomicLong
types used in the
code above are not types from the java.util.concurrent.atomic
package.
They are actually interfaces defined within the
com.oracle.coherence.concurrent.atomic
package that both
LocalAtomicXyz
and RemoteAtomicXyz
classes
implement, which are the instances that are actually returned by the above
methods.
LocalAtomicInteger localFoo = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong remoteBar = Atomics.remoteAtomicLong("bar", 5L);
However, Oracle strongly recommends that you use interfaces instead of concrete types because interfaces make it easy to switch between local and distributed implementations when necessary.
java.util.concurrent.atomic
types:int counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);
This section includes the following topics:
Parent topic: Implementing Concurrency in a Distributed Environment
Asynchronous Implementations of Atomic Types
The instances of numeric atomic types, such as AtomicInteger
and
AtomicLong
, are frequently used to represent various counters in
the application where a client may need to increment the value, but does not necessarily
need to know what the new value is.
When working with the local atomics, you can use the same API shown earlier (see Using Atomics) and simply ignore the return value. However, when using distributed atomics that would introduce unnecessary blocking on the client while waiting for the response from the server, which would then simply be discarded. Obviously, this will have a negative impact on both performance and throughput of the atomics.
To reduce the impact of remote calls in those situations, Coherence Concurrent also provides non-blocking, asynchronous implementations of all atomic types it supports.
async
method on the blocking instance of that
type:// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicInteger', with an implicit initial value of 0.
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicLong', with an initial value of 5.
AsyncAtomicLong asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();
CompletableFuture
for the result, and will not
block:CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long> futureCounter5 = asyncBar.addAndGet(5L);
Both the blocking and non-blocking instances of any distributed atomic type, with the same name, are backed by the same cluster-side atomic instance state, so they can be used interchangeably.
Parent topic: Using Atomics
Using CDI
Atomic types from Coherence Concurrent can also be injected using CDI, which eliminates the need for explicit factory method calls on the Atomics class.
// Injects a local, in-process instance of an 'AtomicInteger' named 'foo', with an implicit initial value of '0'.
@Inject
@Name("foo")
private AtomicInteger localFoo;
// Injects a remote, distributed instance of an 'AtomicInteger' named 'foo', distinct from
// the local instance 'foo', with an implicit initial value of '0'.
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;
// Injects a remote, distributed instance of non-blocking 'AsyncAtomicLong', with an implicit name of 'asyncBar'.
@Inject
@Remote
private AsyncAtomicLong asyncBar
After you obtain an instance of an atomic type through a CDI injection, you can use it
the same way you would use an instance obtained directly from the
Atomics
factory class.
Parent topic: Using Atomics
Using Locks
Coherence Concurrent provides distributed implementations of
Lock
and ReadWriteLock
interfaces from the
java.util.concurrent.locks
package, enabling you to implement
lock-based concurrency control across cluster members when necessary.
Unlike local JDK implementations, the classes in this package use cluster
member/process ID and thread ID to identify lock owner, and store shared lock state
within a Coherence NamedMap
. However, this also implies that the calls
to acquire and release locks are remote, network calls, because they need to update
shared state that is likely stored on a different cluster member. This update may impact
the performance of lock
and unlock
operations.
This section includes the following topics:
Parent topic: Implementing Concurrency in a Distributed Environment
Using Exclusive Locks
A RemoteLock
class provides an implementation of a
Lock
interface and enables you to ensure that only one thread on
one member is running a critical section guarded by the lock at any given time.
RemoteLock
, call the Locks.remoteLock
factory
method:Lock foo = Locks.remoteLock("foo");
Atomics
, you can obtain a local Lock
instance from the Locks
class, which will simply return an instance of
a standard java.util.concurrent.locks.ReentrantLock
, by calling the
localLock
factory
method:Lock foo = Locks.localLock("foo");
Lock
instance, you can use it as you normally
would:foo.lock();
try {
// critical section guarded by the exclusive lock `foo`
}
finally {
foo.unlock();
}
Parent topic: Using Locks
Using Read/Write Locks
A RemoteReadWriteLock
class provides an implementation of a
ReadWriteLock
interface and enables you to ensure that only one
thread on one member is running a critical section guarded by the write
lock at any given time, while allowing multiple concurrent readers.
RemoteReadWriteLock
, call the
Locks.remoteReadWriteLock
factory
method:ReadWriteLock bar = Locks.remoteReadWriteLock("bar");
Atomics
, you can obtain a local
ReadWriteLock
instance from the Locks
class, which
will simply return an instance of a standard
java.util.concurrent.locks.ReentrantReadWriteLock
, by calling the
localReadWriteLock
factory
method:ReadWriteLock bar = Locks.localReadWriteLock("bar");
ReadWriteLock
instance, you can use it as you
normally
would:bar.writeLock().lock()
try {
// critical section guarded by the exclusive write lock `bar`
}
finally {
bar.writeLock().unlock();
}
Or:
bar.readLock().lock()
try {
// critical section guarded by the shared read lock `bar`
}
finally {
bar.readLock().unlock();
}
Parent topic: Using Locks
Using CDI
You can also use CDI to inject both the exclusive and read/write lock instances into objects that need them:
// Injects distributed exclusive lock named 'foo' into the 'lock' field.
@Inject
@Remote
@Name("foo")
private Lock lock;
// Injects distributed read/write lock named 'bar' into the 'bar' field.
@Inject
@Remote
@Name("bar")
private ReadWriteLock bar;
After you obtain an instance of lock through a CDI injection, you can use it the same way
you would use an instance obtained directly from the Locks
factory
class.
Parent topic: Using Locks
Using Latches and Semaphores
Coherence Concurrent also provides distributed implementations of a
CountDownLatch
and Semaphore
classes from the
java.util.concurrent
package, enabling you to implement
synchronization of execution across multiple Coherence cluster members as easily as you
can implement it within a single process using the two JDK classes. It also provides
interfaces for those two concurrency primitives, that both remote and local
implementations conform to.
As seen with Atomics
, the local implementations are nothing more than
thin wrappers around the corresponding JDK classes.
This section includes the following topics:
Parent topic: Implementing Concurrency in a Distributed Environment
Using the Count Down Latch
The RemoteCoundDownLatch
class provides a distributed implementation
of a CountDownLatch
, and enables you to ensure that the execution of
the code on any cluster member that is waiting for the latch proceeds only when the
latch reaches zero. Any cluster member can both wait for a latch and count down.
RemoteCountDownLatch
, call the
Latches.remoteCountDownLatch
factory
method:// Creates an instance of a 'RemoteCountDownLatch' with the initial count of '5'.
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);
Atomics
and Locks
, you can obtain a local
CountDownLatch
instance from the Latches
class by
calling the remoteCountDownLatch
factory
method:// Creates an instance of a 'LocalCountDownLatch' with the initial count of '10'.
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);
After you have a RemoteCountDownLatch
instance, you can use it as you
normally would, by calling the countDown
and await
methods on it.
Parent topic: Using Latches and Semaphores
Using a Semaphore
The RemoteSemaphore
class provides a distributed implementation of a
Semaphore
, and enables any cluster member to acquire and release
permits from the same semaphore
instance.
RemoteSemaphore
, call the
Semaphores.remoteSemaphore
factory
method:// Creates an instance of a 'RemoteSemaphore' with '5' permits.
Semaphore foo = Semaphores.remoteSemaphore("foo", 5);
Atomics
and Locks
, you can obtain a local
Semaphore
instance from the Semaphores
class by
calling the localSemaphore
factory
method:// Creates an instance of a 'LocalSemaphore' with '0' permits.
Semaphore foo = Semaphores.localSemaphore("foo");
After you create a Semaphore
instance, you can use it as you normally
would, by calling the release
and acquire
methods on
it.
Parent topic: Using Latches and Semaphores
Using CDI
You can also use CDI to inject both the CountDownLatch
and
Semaphore
instances into objects that need them:
// Injects an instance of 'LocalCountDownLatch' with the initial count of '5'.
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;
// Injects an instance of 'RemoteCountDownLatch' with the initial count of '10'.
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;
// Inject an instance of 'LocalSemaphore' with '0' (zero) permits available.
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;
// Inject an instance of 'RemoteSemaphore' with '1' permit available.
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;
After you obtain a latch
or a semaphore
instance
through a CDI injection, you can use the same way as you would use an instance obtained
directly from the Latches
or Semaphores
factory
classes.
The @Name
annotation is optional in both cases as long as the member
name (in the examples above, the field name) can be obtained from the injection point,
but is required otherwise (such as when you use a constructor injection).
The @Count
annotation specifies the initial latch count, and if omitted,
will default to one. The @Permits
annotation specifies the number of
available permits for a semaphore, and if omitted, will default to zero, which means
that the first acquire
call will block until another thread releases
one or more permits.
Parent topic: Using Latches and Semaphores
Using Blocking Queues
Coherence supports Queues as data structure from Coherence CE 24.03. The Coherence
NamedQueue
is an implementation of java.util.Queue
and NamedDeque
is an implementation of
java.util.Deque
.
Coherence has two implementations of BlockingQueue
: one is a simple size
limited queue, the second is a distributed paged queue that has a much larger capacity.
The simple queue is available as both a BlockingQueue
and a
double-ended BlockingDeque
. The distributed paged queue is available
only as a BlockingQueue
implementation.
Note:
Coherence queues are mapped to caches, which take the same name as the queue. If a cache is being used for a queue, then the same cache must not be used as a normal data cache.
Blocking Queue
The Coherence Concurrent module contains an implementation of
java.util.concurrent.BlockingQueue
called
NamedBlockingQueue
and an implementation of
java.util.concurrent.BlockingDeque
called
NamedBlockingDeque
.
To use a Coherence blocking queue in your application, you must add a dependency on
the coherence-concurrent
module as follows:
<dependency>
<groupId>com.oracle.coherence</groupId>
<artifactId>coherence-concurrent</artifactId>
<version>14.1.2-0-0</version>
</dependency>
To obtain an instance of a blocking queue use the
com.oracle.coherence.concurrent.Queues
factory class.
To obtain a simple size limited BlockingQueue
named
"my-queue", see the following example:
NamedBlockingQueue<String> queue = Queues.queue("my-queue");
To obtain a simple size limited BlockingDeque
named
"my-deque", see the following example:
NamedBlockingDeque<String> queue = Queues.deque("my-deque");
To obtain a distributed paged BlockingQueue
named
"my-queue", see the following example:
NamedBlockingQueue<String> queue = Queues.pagedQueue("my-queue");
The blocking queue implementations work by using Coherence events. When application code calls a blocking method, the calling thread is blocked but the blocking is not on the server. The application code is unblocked when it receives an event from the server.
For example, if the application code calls the NamedBlockingQueue
take()
method and the queue is empty, this method blocks the calling
thread. When an element is put into the queue by another thread (maybe on another
JVM) the calling application receives an event. This will retry the
take()
and if successful it returns. If the retry of the
take()
is unsuccessful the calling thread remains blocked. For
example, another thread or another JVM was also blocked taking from the same queue
and managed to get it retry in the first attempt.
Another example is an application calling the NamedBlockingQueue
put()
method, which gets blocked when the queue is full (2GB size
limit). In this case, the calling thread is blocked until a delete event is received
to signal that there is now space in the queue. The put()
is
retried and if successful is control returned to the calling thread. If the retry is
unsuccessful the thread remains blocked. For example, another thread or JVM is also
blocked on a put()
and its retry is succeeded and refills the
queue.
Sizing Queues
It is important to understand how the two Coherence queue implementations store data and how this limits the size of a queue.
-
Simple Coherence Queue – The simple queue (and deque) implementation stores data in a single Coherence cache partition. This enforces a size limit of 2 GB because a Coherence cache partition should not exceed 2 GB in size, and in reality, a partition should be a lot smaller than this. Large partitions slow down recovery when a storage enabled member leaves the cluster. With a modern fast network, 300 MB – 500 MB should be a suitable maximum partition size; on a 10 GB network, this could even go as high as 1 GB.
-
Distributed Paged Queue - The distributed paged queue stores data in pages that are distributed around the Coherence cluster over multiple partitions, the same as normal cache data. This means that the paged queue can store far more than 2 GB. It is still important to be aware of how partition sizes limit the total queue size.
The absolute hard limit of 2 GB per partition gives the following size:
2 GB x 257 = 514 GB
But this is far too big to be reliable in production use. If you use a size limit of 500 MB and the default partition count of 257, then you can see how this affects queue size.
500 MB x 257 = 128 GB
So, by default a realistic limit for a paged queue is around 128 GB. If the partition count is increased to 1087, then the queue size becomes:
500 MB x 1087 = 543 GB
Of course, all these examples assume that there are enough JVMs with big enough heap sizes in the cluster to store the queue data in memory.
Limitations
The current queue implementation in Coherence has the following limitations:
-
As previously mentioned, the simple queue has a hard size limit of 2 GB. When using a simple queue or deque, the Coherence server refuses to accept offers to the queue if its size exceeds 2 GB. The
java.util.Queue
contact allows for queues to reject offers, so this size limitation conforms to the queue contract. Application developers should check the response from offering data to the queue to determine whether the offer has succeeded or not. We use the term "offer" here to cover all queue and deque methods that add data to the queue. An alternative to checking the return Boolean from anoffer()
call would be to use aNamedBlockingQueue
where theput()
method gets blocked if the queue is full. -
In a normal operation, queues should not get huge as this would usually mean that the processes reading from the queue are not keeping up with the processes writing to the queue. Application developers should obviously load test their applications using queues to ensure that they are not going to have issues with capacity.
-
Queue operations such as offering and polling will contend on certain data structures, and this limits the number of parallel requests and the speed at which requests get processed. To maintain ordering, polling contends on either the head or tail entry, depending on which end of the queue is being polled. This means that poll methods can only be processed sequentially, so even though a poll is efficient and fast, many concurrent poll requests will queue and be processed one at a time. Offer methods do not contend on the head or tail but will contend on the atomic counters used to maintain the head and tail identifiers. Coherence can process multiple offer requests on different worker threads but there are minor contentions on the
AtomicLong
updates. -
Queue operations that work on the head and tail, such as offering and polling, are efficient. Some of the other methods in
java.util.Queue
andjava.util.Deque
are less efficient. For example, iterator methods,contains()
, and so on. These are not frequently used by applications that require basic queue functionality. Some optional methods on thejava.util.Queue
API that mutate the queue will throwUnsupportedOperationException
(this is allowed by the Java Queue contract), for example,retainAll()
,removeAll()
, and removal using an iterator.
Parent topic: Implementing Concurrency in a Distributed Environment