41 Implementing Concurrency in a Distributed Environment

The Coherence Concurrent module provides distributed implementations of the concurrency primitives from the java.util.concurrent package such as executors, atomics, locks, semaphores, and latches.

You can implement concurrent applications using the constructs you are already familiar with and also expand the "scope" of concurrency from a single process to potentially hundreds of processes within a Coherence cluster. You can use executors to submit tasks to be executed somewhere in the cluster; you can use locks, latches, and semaphores to synchronize execution across many cluster members; you can use atomics to implement global counters across many processes, and so on.

While these features are extremely powerful and enable you to reuse the knowledge you already have, they may have detrimental effect on scalability and/or performance. Whenever you synchronize execution through locks, latches, or semaphores, you are introducing a potential bottleneck into the architecture. Whenever you use a distributed atomic to implement a global counter, you are turning very simple operations that take mere nanoseconds locally, such as increment and decrement, into fairly expensive network calls that could take milliseconds (and potentially block even longer under heavy load).

So, use these features sparingly. In many cases, there is a better, faster, and a more scalable way to accomplish the same goal using Coherence primitives such as entry processors, aggregators, and events. These primitives are designed to perform and scale well in a distributed environment.

Note:

  • To use the concurrency features, Oracle recommends using the Bootstrap API to start the Coherence cluster members. See Using the Bootstrap API.
  • Coherence concurrent features do not support, and cannot be configured to use federation. Coherence Federation is asynchronous. Therefore, it would not make sense to federate data that is inherently atomic in nature.

This chapter includes the following sections:

Using Factory Classes

Each feature (executors, atomics, locks, semaphores, and latches) is backed by one or more Coherence caches, possibly with preconfigured interceptors. All interaction with lower level Coherence primitives is hidden behind various factory classes that allow you to get the instances of the classes you need.

For example, you will use factory methods within the Atomics class to get instances of various atomic types, Locks to get lock instances, Latches and Semaphores to get latches and semaphores.

Using Local and Remote Instances

In many cases, the factory classes allow you to get both the local and the remote instances of various constructs. For example, Locks.localLock will give you an instance of a standard java.util.concurrent.locks.ReentrantLock, while Locks.remoteLock will return an instance of a RemoteLock.

In cases where JDK does not provide a standard interface, which is the case with atomics, latches, and semaphores, the interface from the existing JDK class has been extracted to create a thin wrapper around the corresponding JDK implementation. For example, Coherence Concurrent provides a Semaphore interface and a LocalSemaphore class that wraps java.util.concurrent.Semaphore. The same is true for CountDownLatch and all atomic types.

The main advantage of using factory classes to construct both the local and the remote instances is that it allows you to name local locks the same way you name the remote locks: calling Locks.localLock("foo") always returns the same Lock instance because the Locks class internally caches both the local and the remote instances it creates. In the case of remote locks, every locally cached remote lock instance is ultimately backed by a shared lock instance somewhere in the cluster, which is used to synchronize lock state across the processes.

Using Serialization

Coherence Concurrent supports both Java serialization and POF out-of-the-box serialization, with Java serialization being the default.

If you want to use POF instead, you have to set the coherence.concurrent.serializer system property to pof. You should also include the coherence-concurrent-pof-config.xml file into your own POF configuration file to register the built-in Coherence Concurrent types.

Using Persistence

Coherence Concurrent supports both active and on-demand persistence, but just like in the rest of Coherence it is set to on-demand by default.

To use active persistence, you should set the coherence.concurrent.persistence.environment system property to default-active, or use another persistence environment that has active persistence enabled.

Note:

The caches that store lock and semaphore data are configured as transient, and are not persisted when you use active or on-demand persistence.

Using the Coherence Concurrent Features

You can use the Coherence Concurrent features after declaring the features as a dependency in the pom.xml file.

To declare, make the following entry in the pom.xml file.

<dependency>
    <groupId>${coherence.groupId}</groupId>
    <artifactId>coherence-concurrent</artifactId>
    <version>${coherence.version}</version>
</dependency>

Using Executors

Coherence Concurrent provides a facility to dispatch tasks, either a Runnable, Callable, or Task to a Coherence cluster for execution. Executors that will run the submitted tasks are configured on each cluster member by defining one or more named executors within a cache configuration resource.

This section includes the following topics:

Using Executors - Examples

By default, each Coherence cluster with the coherence-concurrent module on the classpath includes a single-threaded executor that may be used to run the dispatched tasks.

Given this, the simplest example would be:
RemoteExecutor remoteExecutor = RemoteExecutor.getDefault();

Future<Void> result = remoteExecutor.submit(() -> System.out.println("Executed"));

result.get(); // block until completion
If an executor has been configured with the name of Fixed5, then a reference to the executor may be obtained with::
RemoteExecutor remoteExecutor = RemoteExecutor.get("Fixed5");
If no executor has been configured with the given name, the RemoteExecutor will throw the following exception:
RejectedExecutionException

Each RemoteExecutor instance may hold local resources that should be released when the RemoteExecutor is no longer required. Like an ExecutorService, a RemoteExecutor has similar methods to shut down the executor. Calling these methods has no impact on the executors registered within the cluster.

Advanced Orchestration

While the RemoteExecutor does provide functionality similar to the standard ExecutorService included in the JDK, this may not be enough in the context of Coherence. A task might need to run across multiple Coherence members, produce intermediate results, and remain durable in case a cluster member running the task fails.

In such cases, task orchestration can be used. Before diving into the details of orchestration, the following concepts should be understood.

Table 41-1 Task Orchestration Interfaces

Interface Description

Task

Tasks are like Callable and Runnable classes in that they are designed to be potentially run by one or more threads. Unlike Callable and Runnable classes, their execution may occur in different Java Virtual Machines, fail and/or recover between different Java Virtual Machine processes.

Task.Context

Provides contextual information for a Task as it is executed, including the ability to access and update intermediate results for the Executor executing the said Task.

Task.Orchestration

Defines information concerning the orchestration of a Task across a set of executors, defined across multiple Coherence members for a given RemoteExecutor.

Task.Coordinator

A publisher of collected Task results that additionally permits coordination of the submitted Task.

Task.Subscriber

A receiver of items produced by a Task.Coordinator.

Task.Properties

State sharing mechanism for tasks.

Task.Collector

A mutable reduction operation that accumulates results into a mutable result container, optionally transforming the accumulated result into a final representation after all results have been processed.

Tasks

Task implementations define a single method called execute(Context) that performs the task, possibly yielding execution to some later point. After the method has completed execution, by returning a result or throwing an exception (but not a Yield exception), the task is considered completed for the assigned Executor.

A Task may yield execution for a given time by throwing a Yield exception. This exception type signals the execution of a Task by an Executor is to be suspended and resumed at some later point in time, typically by the same Executor.

Task Context

When a Task is executed, a Context instance will be passed as an execution argument.

The Context provides access to task properties allowing shared state between tasks running in multiple Java Virtual Machines.

The Context provides details on overall execution status.

Table 41-2 Execution Status

Execution State Method Description

Complete

Context.isDone()

Allows a Task to determine if the task is complete. Completion may be due to normal termination, an exception, or cancellation. In all of these cases, this method will return true.

Cancelled

Context.isCancelled()

Allows a Task to determine if the task is effectively cancelled.

Resuming

Context.isResuming()

Determines if a Task execution by an Executor is resuming after being recovered (for example, failover) or due to resumption after a task had previously thrown a Yield exception.

Task Orchestration

Orchestrations begin by calling RemoteExecutor.orchestrate(Task), which will return a Task.Orchestration instance for the given Task. With the Task.Orchestration, it's possible to configure the aspects of where the task will be run.

Table 41-3 Task Orchestration Methods

Method Description

concurrently()

Tasks will be run, concurrently, across all Java Virtual Machines, where the named executor is defined/configured. This is the default.

sequentially()

Tasks will be run, in sequence, across all Java Virtual Machines, where the named executor is defined/configured.

limit(int)

Limit the task to n executors. Use this to limit the number of executors that will be considered for task execution. If not set, the default behavior is to run the task on all Java Virtual Machines where the named executor is defined/configured.

filter(Predicate)

Filtering provides an additional way to constrain where a task may be run. The predicates will be applied against metadata associated with each executor on each Java Virtual Machine. Some examples of metadata would be the member in which the executor is running, or the role of a member. Predicates may be chained to provide Boolean logic in determining an appropriate executor.

define(String, <V>)

Define initial state that will be available to all tasks no matter which Java Virtual Machine that task is running on.

retain(Duration)

When specified, the task will be retained allowing new subscribers to be notified of the final result of a task computation after it has completed.

collect(Collector)

This is the terminal of the orchestration builder returning a Task.Collectable, which defines how results are to be collected and ultimately submits the task to the grid.

Task Collector and Collectable

The Task.Collector passed to the orchestration will collect results from tasks and optionally transform the collected results into a final format. Collectors are best illustrated by using examples of Collectors that are available in the TaskCollector class.

Table 41-4 Task Collector Methods

Method Description

count()

The count of non-null results that have been collected from the executing tasks.

firstOf()

Collects and returns the first result provided by the executing tasks.

lastOf()

Collects and returns the last result returned by the executing tasks.

setOf()

Collects and returns all non-null results as a Set.

listOf()

Collects and returns all non-null results as a List.

The Task.Collectable instance returned by calling collect on the orchestration allows, among other things, setting the condition under which no more results will be collected or published by any registered subscribers. Calling submit() on the Task.Collectable will begin the orchestration of the task.

Task Coordinator

Upon calling submit() on the orchestration Collectable, a Task.Coordinator is returned. Like the Task.Collectable, the Task.Coordinator allows for the registration of subscribers. Additionally, it provides the ability to cancel or check the completion status of the orchestration.

Task Subscriber

The Task.Subscriber receives various events pertaining to the execution status of the orchestration.

Table 41-5 Task Subscriber Events

Method Description

onComplete()

Signals the completion of the orchestration.

onError(Throwable)

Called when an unrecoverable error (given as the argument) has occurred.

onNext(<T>)

Called when the Task.Coordinator has produced a result.

onSubscribe(Task.Subscription)

Called prior to any calls to onComplete(), onError(Throwable), or onNext(<T>) are called. The Task.Subscription provided gives access to cancelling the subscription or obtaining a reference to the Task.Coordinator.

Advanced Orchestration - Examples

To begin, consider the following code common to orchestration examples:

// demonstrate orchestration using the default RemoteExecutor
RemoteExecutor executor = RemoteExecutor.getDefault();

// WaitingSubscriber is an implementation of the
// com.oracle.coherence.concurrent.executor.Task.Subscriber interface
// that has a get() method that blocks until Subscriber.onComplete() is
// called and will return the results received by onNext()
WaitingSubscriber subscriber = new WaitingSubscriber();

// ValueTask is an implementation of the
// com.oracle.coherence.concurrent.executor.Task interface
// that returns the value provided at construction time
ValueTask task = new ValueTask("Hello World");

Given the previous example, the simplest example of an orchestration is:

// orchestrate the task, subscribe, and submit
executor.orchestrate(task)
        .subscribe(subscriber)
        .submit();

// wait for the task to complete
// if this was run on four cluster members running the default executor service,
// the returned Collection will have four results
Collection<String> results = subscriber.get();

Building on the previous example, assume a cluster with two storage and two proxy members. The cluster members are configured with the roles of storage and proxy, respectively. Let's say the task needs to run on storage members only, then the orchestration could look like:

// orchestrate the task, filtering by a role, subscribe, and submit
executor.orchestrate(task)
        .filter(Predicates.role("storage"))
        .subscribe(subscriber)
        .submit();

// wait for the task to complete
// as there are only two storage members in this hypothetical, only two
// results will be returned
Collection<String> results = subscriber.get();

There are several predicates available for use in com.oracle.coherence.concurrent.executor.function.Predicates, however, in case none apply to the target use case, simply implement the Remote.Predicate interface.

You can customize the collection of results and how they are presented to the subscriber by using collect(Collector) and until(Predicate):

// orchestrate the task, collecting the first non-null result,
// subscribe, and submit
executor.orchestrate(new MayReturnNullTask())
        .collect(TaskCollectors.firstOf())
        .until(Predicates.nonNullValue())
        .subscribe(subscriber)
        .submit();

// wait for the task to complete
// the first non-result returned will be the one provided to the
// subscriber
Collection<String> results = subscriber.get();

Several collectors are provided in com.oracle.coherence.concurrent.executor.TaskCollectors, however, in case none apply to the target use case, implement the Task.Collector interface.

Configuring Executors

Several executor types are available for configuration.

Table 41-6 Types of Executors

ExecutorService Type Description

Single thread

Creates an ExecutorService with a single thread.

Fixed thread

Creates an ExecutorService with a fixed number of threads.

Cached

Creates an ExecutorService that creates new threads as needed and reuses existing threads when possible.

Work stealing

Creates a work-stealing thread pool by using the number of available processors as its target parallelism level.

Custom

Allows the creation of non-standard executors.

VirtualThread

Creates a VirtualThread-per-task ExecutorService. Requires JDK 21 or later.

Table 41-7 Configuration Elements

Element Name Required Expected Type Description

single

no

N/A

Defines a single-thread executor.

fixed

no

N/A

Defines a fixed-thread pool executor.

cached

no

N/A

Defines a cached-thread-pool executor

work-stealing

no

N/A

Defines a work-stealing pool executor.

custom-executor

no

java.util.concurrent.ExecutorService

Defines a custom executor.

virtual-per-task

no

N/A

Defines a VirtualThread-per-task executor.

name

yes

java.lang.String

Defines the logical name of the executor.

thread-count

yes

java.lang.Integer

Defines the thread count for a fixed-thread pool executor.

parallelism

no

java.lang.Integer

Defines the parallelism of a work-stealing-thread pool executor. If not defined, it defaults to the number of processors available on the system.

thread-factory

no

N/A

Defines a java.util.concurrent.ThreadFactory. Used by single, fixed, and cached executors.

instance

yes

java.util.concurrent.ThreadFactory

Defines how the ThreadFactory will be instantiated. For information about the instance element, see instance. This element must be a child of the thread-factory element.

For complete details, see schema.

Configuration Examples

To define executors, the cache-config root element should include the coherence-concurrent NamespaceHandler to recognize the configuration elements.
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
               xmlns:c="class://com.oracle.coherence.concurrent.config.NamespaceHandler"
               xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd class://com.oracle.coherence.concurrent.config.NamespaceHandler concurrent.xsd"> .
.
.
</cache-config>

Note:

Executors defined by the configuration must precede any other elements in the document. Failing to do so, will prevent the document from being validated.
The following examples assume that the xml namespace defined for the NamespaceHandler is c:
<!-- creates a single-threaded executor named <em>Single</em> -->
<c:single>
  <c:name>Single</c:name>
</c:single>
<!-- creates a single-threaded executor named <em>Single</em> with a thread factory-->
<c:single>
  <c:name>SingleTF</c:name>
  <c:thread-factory>
    <c:instance>
      <c:class-name>my.custom.ThreadFactory</c:class-name>
    </c:instance>
  </c:thread-factory>
</c:single>
<!-- creates a fixed-thread executor named <em>Fixed5</em> -->
<c:fixed>
  <c:name>Single</c:name>
  <c:thread-count>5</c:thread-count>
</c:fixed>

Managing Executors

There are various ways to manage and monitor executors. You can use one of the following options:

Managing Executors Over REST

Coherence Management over REST exposes endpoints to query and invoke actions against ExecutorMBean instances.

Table 41-8 REST Endpoints

Description Method Path Produces

View all Executors

GET

/management/coherence/cluster/executors

JSON

View all Executors with matching name

GET

/management/coherence/cluster/executors/{name}

JSON

Reset Executor statistics by name

POST

/management/coherence/cluster/executors/{name}/resetStatistics

JSON

Using CDI

You can inject RemoteExecutors through CDI.

For example:
@Inject
private RemoteExecutor single;    // injects a RemoteExecutor named 'single'.

@Inject
@Name("Fixed5")
private RemoteExecutor fixedPoolRemoteExecutor;    // injects a RemoteExecutor named 'Fixed5'. 

Using Atomics

Coherence Concurrent provides distributed implementations of atomic types, such as AtomicInteger, AtomicLong, and AtomicReference. It also provides local implementations of the same types. The local implementations are just thin wrappers around the existing java.util.concurrent.atomic types, which implement the same interface as their distributed variants, to be interchangeable.
To create instances of atomic types, you should call the appropriate factory method on the Atomics class:
// Creates a local, in-process instance of named 'AtomicInteger' with an implicit initial value of 0.
AtomicInteger localFoo  = Atomics.localAtomicInteger("foo"); 

// Creates a remote, distributed instance of named 'AtomicInteger', distinct from the local instance 'foo', 
// with an implicit initial value of '0'.   
AtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");  

// Creates a remote, distributed instance of named 'AtomicLong', with an initial value of '5'.
AtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

Note:

The AtomicInteger and AtomicLong types used in the code above are not types from the java.util.concurrent.atomic package. They are actually interfaces defined within the com.oracle.coherence.concurrent.atomic package that both LocalAtomicXyz and RemoteAtomicXyz classes implement, which are the instances that are actually returned by the above methods.
Therefore, you can rewrite the above code as:
LocalAtomicInteger  localFoo  = Atomics.localAtomicInteger("foo");
RemoteAtomicInteger remoteFoo = Atomics.remoteAtomicInteger("foo");
RemoteAtomicLong    remoteBar = Atomics.remoteAtomicLong("bar", 5L);

However, Oracle strongly recommends that you use interfaces instead of concrete types because interfaces make it easy to switch between local and distributed implementations when necessary.

After the instances are created, you can use them the same way you would use any of the corresponding java.util.concurrent.atomic types:
int  counter1 = remoteFoo.incrementAndGet();
long counter5 = remoteBar.addAndGet(5L);

This section includes the following topics:

Asynchronous Implementations of Atomic Types

The instances of numeric atomic types, such as AtomicInteger and AtomicLong, are frequently used to represent various counters in the application where a client may need to increment the value, but does not necessarily need to know what the new value is.

When working with the local atomics, you can use the same API shown earlier (see Using Atomics) and simply ignore the return value. However, when using distributed atomics that would introduce unnecessary blocking on the client while waiting for the response from the server, which would then simply be discarded. Obviously, this will have a negative impact on both performance and throughput of the atomics.

To reduce the impact of remote calls in those situations, Coherence Concurrent also provides non-blocking, asynchronous implementations of all atomic types it supports.

To obtain a non-blocking instance of any supported atomic type, simply call the async method on the blocking instance of that type:
// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicInteger', with an implicit initial value of 0.
AsyncAtomicInteger asyncFoo = Atomics.remoteAtomicInteger("foo").async();      

// Creates a remote, distributed instance of named, non-blocking 'AsyncAtomicLong', with an initial value of 5.
AsyncAtomicLong    asyncBar = Atomics.remoteAtomicLong("bar", 5L).async();
After you create these instances, you can use them the same way you would use any of the corresponding blocking types. The only difference is that the non-blocking instances will simply return a CompletableFuture for the result, and will not block:
CompletableFuture<Integer> futureCounter1 = asyncFoo.incrementAndGet();
CompletableFuture<Long>    futureCounter5 = asyncBar.addAndGet(5L);

Both the blocking and non-blocking instances of any distributed atomic type, with the same name, are backed by the same cluster-side atomic instance state, so they can be used interchangeably.

Using CDI

Atomic types from Coherence Concurrent can also be injected using CDI, which eliminates the need for explicit factory method calls on the Atomics class.

// Injects a local, in-process instance of an 'AtomicInteger' named 'foo', with an implicit initial value of '0'.
@Inject
@Name("foo")
private AtomicInteger localFoo;   

// Injects a remote, distributed instance of an 'AtomicInteger' named 'foo', distinct from 
// the local instance 'foo', with an implicit initial value of '0'.
@Inject
@Remote
@Name("foo")
private AtomicInteger remoteFoo;  

// Injects a remote, distributed instance of non-blocking 'AsyncAtomicLong', with an implicit name of 'asyncBar'.
@Inject
@Remote
private AsyncAtomicLong asyncBar 

After you obtain an instance of an atomic type through a CDI injection, you can use it the same way you would use an instance obtained directly from the Atomics factory class.

Using Locks

Coherence Concurrent provides distributed implementations of Lock and ReadWriteLock interfaces from the java.util.concurrent.locks package, enabling you to implement lock-based concurrency control across cluster members when necessary.

Unlike local JDK implementations, the classes in this package use cluster member/process ID and thread ID to identify lock owner, and store shared lock state within a Coherence NamedMap. However, this also implies that the calls to acquire and release locks are remote, network calls, because they need to update shared state that is likely stored on a different cluster member. This update may impact the performance of lock and unlock operations.

This section includes the following topics:

Using Exclusive Locks

A RemoteLock class provides an implementation of a Lock interface and enables you to ensure that only one thread on one member is running a critical section guarded by the lock at any given time.

To obtain an instance of a RemoteLock, call the Locks.remoteLock factory method:
Lock foo = Locks.remoteLock("foo");
As seen with Atomics, you can obtain a local Lock instance from the Locks class, which will simply return an instance of a standard java.util.concurrent.locks.ReentrantLock, by calling the localLock factory method:
Lock foo = Locks.localLock("foo");
After you create a Lock instance, you can use it as you normally would:
foo.lock();
try {
    // critical section guarded by the exclusive lock `foo`
}
finally {
    foo.unlock();
}

Using Read/Write Locks

A RemoteReadWriteLock class provides an implementation of a ReadWriteLock interface and enables you to ensure that only one thread on one member is running a critical section guarded by the write lock at any given time, while allowing multiple concurrent readers.

To obtain an instance of a RemoteReadWriteLock, call the Locks.remoteReadWriteLock factory method:
ReadWriteLock bar = Locks.remoteReadWriteLock("bar");
As seen with Atomics, you can obtain a local ReadWriteLock instance from the Locks class, which will simply return an instance of a standard java.util.concurrent.locks.ReentrantReadWriteLock, by calling the localReadWriteLock factory method:
ReadWriteLock bar = Locks.localReadWriteLock("bar");
After you create a ReadWriteLock instance, you can use it as you normally would:
bar.writeLock().lock()
try {
    // critical section guarded by the exclusive write lock `bar`
}
finally {
    bar.writeLock().unlock();
}

Or:

bar.readLock().lock()
try {
    // critical section guarded by the shared read lock `bar`
}
finally {
    bar.readLock().unlock();
}

Using CDI

You can also use CDI to inject both the exclusive and read/write lock instances into objects that need them:

// Injects distributed exclusive lock named 'foo' into the 'lock' field.
@Inject
@Remote
@Name("foo")
private Lock lock;           

// Injects distributed read/write lock named 'bar' into the 'bar' field.
@Inject
@Remote
@Name("bar")
private ReadWriteLock bar; 

After you obtain an instance of lock through a CDI injection, you can use it the same way you would use an instance obtained directly from the Locks factory class.

Using Latches and Semaphores

Coherence Concurrent also provides distributed implementations of a CountDownLatch and Semaphore classes from the java.util.concurrent package, enabling you to implement synchronization of execution across multiple Coherence cluster members as easily as you can implement it within a single process using the two JDK classes. It also provides interfaces for those two concurrency primitives, that both remote and local implementations conform to.

As seen with Atomics, the local implementations are nothing more than thin wrappers around the corresponding JDK classes.

This section includes the following topics:

Using the Count Down Latch

The RemoteCoundDownLatch class provides a distributed implementation of a CountDownLatch, and enables you to ensure that the execution of the code on any cluster member that is waiting for the latch proceeds only when the latch reaches zero. Any cluster member can both wait for a latch and count down.

To obtain an instance of RemoteCountDownLatch, call the Latches.remoteCountDownLatch factory method:
// Creates an instance of a 'RemoteCountDownLatch' with the initial count of '5'.
CoundDownLatch foo = Latches.remoteCountDownLatch("foo", 5);
As seen with Atomics and Locks, you can obtain a local CountDownLatch instance from the Latches class by calling the remoteCountDownLatch factory method:
// Creates an instance of a 'LocalCountDownLatch' with the initial count of '10'.
CoundDownLatch foo = Latches.localCountDownLatch("foo", 10);

After you have a RemoteCountDownLatch instance, you can use it as you normally would, by calling the countDown and await methods on it.

Using a Semaphore

The RemoteSemaphore class provides a distributed implementation of a Semaphore, and enables any cluster member to acquire and release permits from the same semaphore instance.

To obtain an instance of RemoteSemaphore, call the Semaphores.remoteSemaphore factory method:
// Creates an instance of a 'RemoteSemaphore' with '5' permits.
Semaphore foo = Semaphores.remoteSemaphore("foo", 5);
As seen with Atomics and Locks, you can obtain a local Semaphore instance from the Semaphores class by calling the localSemaphore factory method:
// Creates an instance of a 'LocalSemaphore' with '0' permits.  
Semaphore foo = Semaphores.localSemaphore("foo");

After you create a Semaphore instance, you can use it as you normally would, by calling the release and acquire methods on it.

Using CDI

You can also use CDI to inject both the CountDownLatch and Semaphore instances into objects that need them:

// Injects an instance of 'LocalCountDownLatch' with the initial count of '5'.
@Inject
@Name("foo")
@Count(5)
private CountDownLatch localLatchFoo;

// Injects an instance of 'RemoteCountDownLatch' with the initial count of '10'.
@Inject
@Name("foo")
@Remote
@Count(10)
private CountDownLatch remoteLatchFoo;          

// Inject an instance of 'LocalSemaphore' with '0' (zero) permits available.
@Inject
@Name("bar")
@Remote
private Semaphore localSemaphoreBar;            

// Inject an instance of 'RemoteSemaphore' with '1' permit available.
@Inject
@Name("bar")
@Remote
@Permits(1)
private Semaphore remoteSemaphoreBar;

After you obtain a latch or a semaphore instance through a CDI injection, you can use the same way as you would use an instance obtained directly from the Latches or Semaphores factory classes.

The @Name annotation is optional in both cases as long as the member name (in the examples above, the field name) can be obtained from the injection point, but is required otherwise (such as when you use a constructor injection).

The @Count annotation specifies the initial latch count, and if omitted, will default to one. The @Permits annotation specifies the number of available permits for a semaphore, and if omitted, will default to zero, which means that the first acquire call will block until another thread releases one or more permits.

Using Blocking Queues

Coherence supports Queues as data structure from Coherence CE 24.03. The Coherence NamedQueue is an implementation of java.util.Queue and NamedDeque is an implementation of java.util.Deque.

Coherence has two implementations of BlockingQueue: one is a simple size limited queue, the second is a distributed paged queue that has a much larger capacity. The simple queue is available as both a BlockingQueue and a double-ended BlockingDeque. The distributed paged queue is available only as a BlockingQueue implementation.

Note:

Coherence queues are mapped to caches, which take the same name as the queue. If a cache is being used for a queue, then the same cache must not be used as a normal data cache.

Blocking Queue

The Coherence Concurrent module contains an implementation of java.util.concurrent.BlockingQueue called NamedBlockingQueue and an implementation of java.util.concurrent.BlockingDeque called NamedBlockingDeque.

To use a Coherence blocking queue in your application, you must add a dependency on the coherence-concurrent module as follows:


    <dependency>
        <groupId>com.oracle.coherence</groupId>
        <artifactId>coherence-concurrent</artifactId>
        <version>14.1.2-0-0</version>
    </dependency>

To obtain an instance of a blocking queue use the com.oracle.coherence.concurrent.Queues factory class.

To obtain a simple size limited BlockingQueue named "my-queue", see the following example:

NamedBlockingQueue<String> queue = Queues.queue("my-queue");

To obtain a simple size limited BlockingDeque named "my-deque", see the following example:

NamedBlockingDeque<String> queue = Queues.deque("my-deque");

To obtain a distributed paged BlockingQueue named "my-queue", see the following example:

NamedBlockingQueue<String> queue = Queues.pagedQueue("my-queue");

The blocking queue implementations work by using Coherence events. When application code calls a blocking method, the calling thread is blocked but the blocking is not on the server. The application code is unblocked when it receives an event from the server.

For example, if the application code calls the NamedBlockingQueue take() method and the queue is empty, this method blocks the calling thread. When an element is put into the queue by another thread (maybe on another JVM) the calling application receives an event. This will retry the take() and if successful it returns. If the retry of the take() is unsuccessful the calling thread remains blocked. For example, another thread or another JVM was also blocked taking from the same queue and managed to get it retry in the first attempt.

Another example is an application calling the NamedBlockingQueue put() method, which gets blocked when the queue is full (2GB size limit). In this case, the calling thread is blocked until a delete event is received to signal that there is now space in the queue. The put() is retried and if successful is control returned to the calling thread. If the retry is unsuccessful the thread remains blocked. For example, another thread or JVM is also blocked on a put() and its retry is succeeded and refills the queue.

Sizing Queues

It is important to understand how the two Coherence queue implementations store data and how this limits the size of a queue.

  • Simple Coherence Queue – The simple queue (and deque) implementation stores data in a single Coherence cache partition. This enforces a size limit of 2 GB because a Coherence cache partition should not exceed 2 GB in size, and in reality, a partition should be a lot smaller than this. Large partitions slow down recovery when a storage enabled member leaves the cluster. With a modern fast network, 300 MB – 500 MB should be a suitable maximum partition size; on a 10 GB network, this could even go as high as 1 GB.

  • Distributed Paged Queue - The distributed paged queue stores data in pages that are distributed around the Coherence cluster over multiple partitions, the same as normal cache data. This means that the paged queue can store far more than 2 GB. It is still important to be aware of how partition sizes limit the total queue size.

The absolute hard limit of 2 GB per partition gives the following size:

2 GB x 257 = 514 GB

But this is far too big to be reliable in production use. If you use a size limit of 500 MB and the default partition count of 257, then you can see how this affects queue size.

500 MB x 257 = 128 GB

So, by default a realistic limit for a paged queue is around 128 GB. If the partition count is increased to 1087, then the queue size becomes:

500 MB x 1087 = 543 GB

Of course, all these examples assume that there are enough JVMs with big enough heap sizes in the cluster to store the queue data in memory.

Limitations

The current queue implementation in Coherence has the following limitations:

  • As previously mentioned, the simple queue has a hard size limit of 2 GB. When using a simple queue or deque, the Coherence server refuses to accept offers to the queue if its size exceeds 2 GB. The java.util.Queue contact allows for queues to reject offers, so this size limitation conforms to the queue contract. Application developers should check the response from offering data to the queue to determine whether the offer has succeeded or not. We use the term "offer" here to cover all queue and deque methods that add data to the queue. An alternative to checking the return Boolean from an offer() call would be to use a NamedBlockingQueue where the put() method gets blocked if the queue is full.

  • In a normal operation, queues should not get huge as this would usually mean that the processes reading from the queue are not keeping up with the processes writing to the queue. Application developers should obviously load test their applications using queues to ensure that they are not going to have issues with capacity.

  • Queue operations such as offering and polling will contend on certain data structures, and this limits the number of parallel requests and the speed at which requests get processed. To maintain ordering, polling contends on either the head or tail entry, depending on which end of the queue is being polled. This means that poll methods can only be processed sequentially, so even though a poll is efficient and fast, many concurrent poll requests will queue and be processed one at a time. Offer methods do not contend on the head or tail but will contend on the atomic counters used to maintain the head and tail identifiers. Coherence can process multiple offer requests on different worker threads but there are minor contentions on the AtomicLong updates.

  • Queue operations that work on the head and tail, such as offering and polling, are efficient. Some of the other methods in java.util.Queue and java.util.Deque are less efficient. For example, iterator methods, contains(), and so on. These are not frequently used by applications that require basic queue functionality. Some optional methods on the java.util.Queue API that mutate the queue will throw UnsupportedOperationException (this is allowed by the Java Queue contract), for example, retainAll(), removeAll(), and removal using an iterator.