30 Processing Data In a Cache
This chapter includes the following sections:
- Overview of Processing Data In a Cache
Coherence provides the ideal infrastructure for building data grid services and the client and server-based applications that use a data grid. - Using Agents for Targeted, Parallel and Query-Based Processing
You can process data in a cache using agents that are commonly referred to as entry processors. - Performing Data Grid Aggregation
TheInvocableMap
interface supports entry aggregators that perform operations against a subset of entries to obtain a single result. - Performing Data Grid Aggregation Using Streams
The use of streams provides a simplified programming model especially when combined with Java lambda expressions. - Performing Node-Based Processing
Coherence provides an invocation service which allows execution of single-pass agents (called invocable objects) anywhere within the grid.
Parent topic: Performing Data Grid Operations
Overview of Processing Data In a Cache
This section includes the following topics:
- Performing Targeted Processing
- Performing Parallel Processing
- Performing Query-Based Processing
- Performing Data-Grid-Wide Processing
Parent topic: Processing Data In a Cache
Performing Targeted Processing
Coherence provides for the ability to execute an agent against an entry in any map of data managed by a data grid:
map.invoke(key, agent);
In the case of partitioned data, the agent executes on the grid node that owns the data. The queuing, concurrency management, agent execution, data access by the agent, and data modification by the agent all occur on that grid node. (Only the synchronous backup of the resultant data modification, if any, requires additional network traffic.) For many processing purposes, it is much more efficient to move the serialized form of the agent (at most a few hundred bytes) than to handle distributed concurrency control, coherency and data updates.
For request and response processing, the agent returns a result:
Object oResult = map.invoke(key, agent);
Coherence, as a data grid, determines the location to execute the agent based on the configuration for the data topology. It moves the agent to the determined location, executes the agent (automatically handling concurrency control for the item while executing the agent), backs up the modifications (if any), and returns a result.
Parent topic: Overview of Processing Data In a Cache
Performing Parallel Processing
Coherence provides map-reduce functionality which allows agents to be executed in parallel against a collection of entries across all nodes in the grid. Parallel execution allows large amounts of data to be processed by balancing the work across the grid. The invokeAll
method is used as follows:
map.invokeAll(collectionKeys, agent);
For request and response processing, the agent returns one result for each key processed:
Map mapResults = map.invokeAll(collectionKeys, agent);
Coherence determines the optimal location(s) to execute the agent based on the configuration for the data topology. It then moves the agent to the determined locations, executes the agent (automatically handling concurrency control for the item(s) while executing the agent), backs up the modifications (if any), and returns the coalesced results. See Performing Data Grid Aggregation.
Parent topic: Overview of Processing Data In a Cache
Performing Query-Based Processing
Coherence supports the ability to query across the entire data grid. See Querying Data in a Cache. For example, in a trading system it is possible to query for all open Order
objects for a particular trader:
NamedCache map = CacheFactory.getCache("trades"); Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid), new EqualsFilter("getStatus", Status.OPEN)); Set setOpenTradeIds = mapTrades.keySet(filter);
By combining this feature with the use of parallel executions in the data grid, Coherence provides the ability to execute an agent against a query. As in the previous section, the execution occurs in parallel, and instead of returning the identities or entries that match the query, Coherence executes the agent against the entries:
map.invokeAll(filter, agent);
For request and response processing, the agent returns one result for each key processed:
Map mapResults = map.invokeAll(filter, agent);
Coherence combines parallel query and parallel execution to achieve query-based agent invocation against a data grid.
Parent topic: Overview of Processing Data In a Cache
Performing Data-Grid-Wide Processing
Passing an instance of AlwaysFilter
(or null
) to the invokeAll
method causes the passed agent to be executed against all entries in the InvocableMap
:
map.invokeAll((Filter) null, agent);
As with the other types of agent invocation, request and response processing is supported:
Map mapResults = map.invokeAll((Filter) null, agent);
An application can process all the data spread across a particular map in the data grid with a single line of code.
Parent topic: Overview of Processing Data In a Cache
Using Agents for Targeted, Parallel and Query-Based Processing
This section includes the following topics:
- Overview of Entry Processor Agents
- Processing Entries Using Lambda Expressions
- Processing Entries in Multiple Caches
- Ignoring the Results of an Entry Processor
- Performing Synthetic Operations
- Processing Entries Asynchronously
Parent topic: Processing Data In a Cache
Overview of Entry Processor Agents
Agents implement the EntryProcessor
interface, typically by extending the AbstractProcessor
class. Coherence includes the following predefined EntryProcessor
implementations that are included in the com.tangosol.util.processor
package:
-
AbstractProcessor
- an abstract base class for building anEntryProcessor
-
AsynchronousProcessor
- A wrapper class that allows for an asynchronous invocation of an underlying entry processor. See Processing Entries Asynchronously. -
CompositeProcessor
- bundles a collection ofEntryProcessor
objects that are invoked sequentially against the same entry -
ConditionalProcessor
- conditionally invokes anEntryProcessor
if aFilter
against the entry-to-process evaluates totrue
-
ConditionalPut
- performs anEntry.setValue
operation if the specified condition is satisfied -
ConditionalPutAll
- performs anEntry.setValue
operation for multiple entries that satisfy the specified condition -
ConditionalRemove
- performs anEntry.remove
operation if the specified condition is satisfied -
ExtractorProcessor
- extracts and returns a value (such as a property value) from an object stored in anInvocableMap
-
NumberIncrementor
- pre- or post-increments any property of a primitive integral type, andByte
,Short
,Integer
,Long
,Float
,Double
,BigInteger
,BigDecimal
-
NumberMultiplier
- multiplies any property of a primitive integral type, andByte
,Short
,Integer
,Long
,Float
,Double
,BigInteger
,BigDecimal
, and returns either the previous or new value -
PreloadRequest
- performs anEntry.getValue
call. No results are reported back to the caller. The processor provides a means to load an entry or a collection of entries into the cache using a cache loader without incurring the cost of sending the value(s) over the network. If the corresponding entry (or entries) already exists in the cache, or if the cache does not have a loader, then invoking this processor has no effect. -
PriorityProcessor
- explicitly controls the scheduling priority and timeouts for execution ofEntryprocessor
methods. -
PropertyProcessor
- an abstract base class forEntryProcessor
implementations that depend on aPropertyManipulator
. TheNumberIncrementor
andNumberMultiplier
entry processors extend this processor. -
UpdaterProcessor
- updates an attribute of an object cached in anInvocableMap
. -
VersionedPut
- performs anEntry.setValue
operation if the version of the specified value matches the version of the current value. For a match, the processor increments the version indicator before the value is updated. Entry values must implement theVersionable
interface. -
VersionedPutAll
- performs anEntry.setValue
operation only for entries whose versions match to versions of the corresponding current values. For a match, the processor increments the version indicator before each value is updated. Entry values must implement theVersionable
interface.
The EntryProcessor
interface (contained within the
InvocableMap
interface) contains only two methods:
process
and processAll
. Historically, the
AbstractProcessor
provided the default implementation of the
processAll
method.
However, as of Coherence 14c (14.1.2.0.0),
processAll
will delegate to the default
implementation by the EntryProcessor
interface. Therefore,
custom Entry Processor implementations should implement only the
EntryProcessor
interface (no need to extend
AbstractProcessor
).
Note:
If the processAll
call throws an exception, changes are only made to the underlying Map
for entries that were removed from the setEntries
. Changes that are made to the remaining entries are not processed.
The InvocableMap.Entry
that is passed to an EntryProcessor
is an extension of the Map.Entry
interface that allows an EntryProcessor
implementation to obtain the necessary information about the entry and to make the necessary modifications in the most efficient manner possible.
Processing Entries Using Lambda Expressions
Lambda expressions can be used as entry processors and can result in more concise client code that does not require the processor to be serialized or registered in a POF configuration file. The following example creates an entry processor as a lambda expression and uses the entry processor within the invokeAll
method:
InvocableMap.EntryProcessor<ContactId, Contact, Void> processor = (entry) ->
{
Contact contact = entry.getValue();
contact.setFirstName(contact.getFirstName().toUpperCase());
entry.setValue(contact);
return null;
};
cache.invokeAll(processor);
The following example creates an entry processor as a lambda expression directly within the invokeAll
method.
Address addrWork = new Address("201 Newbury St.", "Yoyodyne, Ltd.",
"Boston", "MA", "02116", "US");
ValueExtractor extractor =
Lambda.extractor(Contact::getHomeAddress).andThen(Address::getState);
Filter filter = equal(extractor, "MA");
addrWork.setStreet1("200 Newbury St.");
cache.invokeAll(filter, entry ->
{
Contact contact = entry.getValue();
contact.setWorkAddress(addrWork);
entry.setValue(contact);
return null;
});
Alternatively, you can invoke lambda expressions by using the functional-style
methods, such as compute()
, computeIfAbsent()
, and so on,
that are provided through java.util.Map
.
Using these functional-style methods leads to slightly more concise code. For example, the
compute()
method implicitly updates the value in the cache to whatever
you return. Whereas, when you use invoke()
, you must call
entry.setValue()
explicitly. A possible drawback using
compute()
is that it returns the entire object, whereas with
invoke()
you can specify what the return value should be.
Comparing the two options: functional-style methods such as compute()
are
more focused and part of the standard Java API. However, using invoke()
is
the most generic, yet slightly more verbose, option.
Note:
Lambda expressions cannot be nested. For example:cache.invoke(filter, entry -> {Runnable r = () -> System.out.println("");
r.run();}
This section includes the following topics:
About Lambdas in a Distributed Environment
Executing lambda expressions in distributed environments can be problematic due to the static nature of lambdas. Only the metadata that describes the lambda is sent across the wire. Therefore, the same compiled code is required in both the client and server classpath/modulepath. Any changes or additions of new lambda expressions on the client require a redeployment and restart of both the client and the server. In addition, synthetic lambda method names are not stable across class versions. Therefore, all cluster members must have the exact version of a class and must be upgraded, including extend clients, at the same time.
To overcome these limitation, a dynamic implementation for lambdas is provided. The dynamic implementation sends both the lambda metadata and the actual byte code to be executed. Client-side byte code is parsed and then from it a new lambda class is generated. On the server, a lambda class is created based on the byte code received from the client, and executed. The dynamic implementation:
-
allows modification of existing (or the introduction of new) behavior without the need for redeployment or server restart.
-
eliminates the issues related to lambda naming instability.
-
allows multiple different versions of a class throughout the cluster.
To ensure that the dynamic implementation works correctly, do not refer to named methods and constructors in a lambda expression, because method and constructor references are always treated as static lambdas. In addition, the dynamic implementation captures only enclosing method arguments and local variables.
Coherence has been using dynamic lambdas implementation in a distributed environment exclusively since Coherence release 12.2.1. To provide users the ability to choose between the convenience and flexibility of dynamic lambdas and the enhanced security of static lambdas, a configuration option has been added to enable selection between using dynamic or static lambda serialization mode.
Parent topic: Processing Entries Using Lambda Expressions
Configuring Lambda Serialization Mode
Dynamic lambdas present a security vulnerability due to reliance on remote code deployment across the distributed environment.
coherence.lambdas
to explicitly configure static
or dynamic
lambda serialization mode, as shown below:-Dcoherence.lambdas=static
cluster-config
subelement lambdas-serialization
to static
or dynamic
lambda serialization mode, as shown below:<cluster-config>
…
<lambdas-serialization>static</lambdas-serialization>
…
</cluster-config>
There is nothing more to do if one configures dynamic lambda serialization mode. Dynamic lambdas send byte code between Coherence members in the distributed environment.
There is an additional step needed when using static lambdas serialization mode. The static lambda serialization only sends lambda metadata references between members in the distributed environment and the same class files containing the lambda expressions must be in the Java classpath/modulepath of all JVMs in the distributed environment, or the static lambda metadata reference will not resolve in the Coherence receiving member.
Example: When the class defining the serialized lambda is missing from the server’s java classpath/modulepath context
Here is an example print stack trace of an exception handled by the client invoking the lambda:
Failed request execution for DistributedCache service on Member(Id=3,,…)) java.lang.ClassNotFoundException: lambda.AbstractRemoteFunctionTests … Caused by: java.lang.RuntimeException: Failed to deserialize static lambda lambda/RemoteTests$lambda$testLambdaInvoke$393ba5d0$1(Lcom/tangosol/util/InvocableMap$Entry;)Ljava/lang/Object; due to missing context class lambda.RemoteTests. At com.tangosol.internal.util.invoke.lambda.StaticLambdaInfo.toSerializedLambda(StaticLambdaInfo.java:430
The above failure is resolved by ensuring that the missing class
lambda.RemoteTests
is in all server classpaths/modulepaths so
that the static lambda reference sent by the invoking client can be resolved on the
server side.
Example: When static lambda names are different due to different versions of the Java class in the client and server
public void testSimple() {
NamedCache cache = …;
cache.invoke(1, entry -> entry.setValue(entry.getValue() + 1));
cache.invoke(2, entry -> entry.setValue(entry.getValue() + 1));
public void testSimple() {
NamedCache cache = …;
cache.invoke(1, entry -> entry.setValue(entry.getValue() + 1));
(Wrapped: Failed request execution for DistributedCache service on Member(Id=1,…)) java.lang.IllegalArgumentException: Invalid lambda deserialization … Caused by: java.lang.RuntimeException: Exception resolving static lambda SerializedLambda[capturingClass=class lambda.AbstractRemoteFunctionTests, functionalInterfaceMethod=com/tangosol/util/InvocableMap$EntryProcessor.process:(Lcom/tangosol/util/InvocableMap$Entry;)Ljava/lang/Object;, implementation=invokeStatic lambda/AbstractRemoteFunctionTests.lambda$testSimple$393ba5d0$2:(Lcom/tangosol/util/InvocableMap$Entry;)Ljava/lang/Object;, instantiatedMethodType=(Lcom/tangosol/util/InvocableMap$Entry;)Ljava/lang/Object;, numCaptured=0]
Resolution is to configure the server with same version of Java class
containing the lambda as the client is using. For this error scenario, the client
had two static lambda invocations in the
AbstractRemoteFunctionTests#simpleTest
method and the server
Java class version had only one invocation, thus, the invocation target
lambda/AbstractRemoteFunctionTests.lambda$testSimple$393ba5d0$2
was not found on the server classpath/modulepath.
Example: When static lambda expression has different number of capture arguments on the client and server version of the Java class
public void testSimple() {
NamedCache cache = …;
Integer nInc = 5;
cache.invoke(1, entry -> entry.setValue(entry.getValue() + nInc));
public void testSimple() {
NamedCache cache = …;
cache.invoke(1, entry -> entry.setValue(entry.getValue() + 1));
nInc
, in the lambda expression and it can not be resolved on
the server side that has the lambda expression with no capture
arguments.<Info> (thread=main, member=2): (Wrapped: Failed request execution for DistributedCache service on Member(Id=1, Timestamp=2021-08-06 09:37:57.057, Address=127.0.0.1:8888, MachineId=10131, Location=machine:localhost,process:19159, Role=RemoteFunctionJavaStaticLambda_Simple)) java.lang.IllegalArgumentException: Invalid lambda deserialization … Caused by: java.lang.RuntimeException: Exception resolving static lambda SerializedLambda[capturingClass=class lambda.AbstractRemoteFunctionTests, functionalInterfaceMethod=com/tangosol/util/InvocableMap$EntryProcessor.process:(Lcom/tangosol/util/InvocableMap$Entry;)Ljava/lang/Object;, implementation=invokeStatic lambda/AbstractRemoteFunctionTests.lambda$testSimple$6572cde9$1:(Ljava/lang/Integer;Lcom/tangosol/util/InvocableMap$Entry;)Ljava/lang/Object;, instantiatedMethodType=(Lcom/tangosol/util/InvocableMap$Entry;)Ljava/lang/Object;, numCaptured=1] at com.tangosol.internal.util.invoke.lambda.StaticLambdaInfo.createLambda(StaticLambdaInfo.java:398)
Resolution is to configure the server with the same version of the Java
class containing the lambda as the client is using. For this error scenario, the
client had two static lambda invocations in the
AbstractRemoteFunctionTests#simpleTest
method and the server
Java class version had only one invocation, thus, the invocation target
lambda/AbstractRemoteFunctionTests.lambda$testSimple$393ba5d0$2
was not found on the server classpath/modulepath.
Example: Failed dynamic lambda invocation to a static lambda configured server
(Wrapped: Failed request execution for DistributedCache service on Member(Id=3,…)) java.io.NotSerializableException: com.tangosol.internal.util.invoke.RemoteConstructor … at com.tangosol.coherence.component.util.daemon.queueProcessor.service.grid.partitionedService.PartitionedCache.onInvokeRequest(PartitionedCache.CDB:93)
Resolution is to configure the client to the static lambda serialization mode and to ensure the server classpath/modulepath contains the client side Java class containing the lambda expression. If the server was incorrectly in the static lambda serialization mode, this issue can be addressed by configuring the server to the dynamic lambda serialization mode.
Parent topic: Processing Entries Using Lambda Expressions
Considerations for a Rolling Upgrade
The failures described in Configuring Lambda Serialization Mode can occur when performing a rolling upgrade in the static lambda serialization mode. Both the application code and Coherence versions have to be the same in the Java clients and servers in the distributed environment for the synthetic lambda method names to be stable across the Java class files. The Java application code containing the lambda expressions sent from one JVM to another in a Coherence distributed environment must be on all JVM classpath/modulepath. Oracle recommends you to perform a rolling upgrade with the dynamic lambda serialization mode enabled if you observe any of these failures while testing the rolling upgrade.
Parent topic: Processing Entries Using Lambda Expressions
Processing Entries in Multiple Caches
Entry processors can update cache entries in multiple caches within a single process
or processAll
operation. The caches must be managed by the same service and the entries must be located in the same partition. See Specifying Data Affinity.
The process
and processAll
operations are performed in a transaction-like manner that uses implicit locks when accessing, inserting, updating, modifying, or removing cache entries. If an exception is thrown during the processing of the entries, the entries are rolled back leaving all underlying values unchanged. The processAll
operation is atomic with respect to all entries within a single partition (or member if no service threads are configured) rather than the individual entry or entire request.
Note:
The implicit lock may create a deadlock if entries are locked in conflicting orders on different threads. The application is responsible for ensuring that cache entries are accessed (locked) in a deadlock-free manner. In the case where a deadlock is detected, an exception is thrown but the underlying service is not stopped.
The com.tangosol.net.BackingMapContext
API is used to process entries in multiple caches and provides a way to directly access entries in a cache's backing map. The backing map is the actual Map
implementation where entries are stored (as opposed to the logical representation of a cache typically used by an application). Entries in a backing map are stored in binary format and therefore require an application to handle the serialized form of an entry. See Implementing Storage and Backing Maps.
The com.tangosol.util.BinaryEntry
API provides easy access to a backing map context and is typically used by an application. The following sample code demonstrates how to update entries in two different caches within the process
method of an entry processor using the BinaryEntry
API.
public Object process(Entry entry) { BinaryEntry binEntry = (BinaryEntry) entry; Binary binKey = binEntry.getBinaryKey(); Trade trade = (Trade) binEntry.getValue(); // Update a Trade object in cache1 trade.setPrice(trade.getPrice() + factor); binEntry.setValue(trade); // update a Trade object in cache2 BackingMapManagerContext ctx = binEntry.getContext(); BinaryEntry binEntry2 = (BinaryEntry) ctx.getBackingMapContext("cache2").getBackingMapEntry(binKey); Trade trade2 = (Trade) binEntry2.getValue(); trade2.setPrice(trade2.getPrice() + factor); binEntry2.setValue(trade2); return null; }
Note:
The getBackingMapEntry
method may only be called within the context of an entry processor invocation. Any changes made to the entry are persisted with the same lifecycle as those made by the enclosing invocation. The returned entry is only valid for the duration of the enclosing invocation and multiple calls to this method within the same invocation context returns the same entry object.
Ignoring the Results of an Entry Processor
The processAll
method of the AbstractProcessor
class returns a map of results to a client application. The map contains the keys and values for every entry that was processed. Most often, the entry processor returns results that the client uses. However, there may be situations where some results are not usable by the client. More importantly, there may be situations where the processor must evaluate all the entries in a cache; in which case, the return map contains every key in the cache. In both situations, the agent should be designed to ignore results that are not wanted.
Designing an agent that only returns wanted results is a good pattern and best practice, because it:
-
Makes the client's memory footprint independent of the size of the cache.
-
Avoids transferring all affected keys to the client which could result in an
OutOfMemoryError
exception on the client if the cache is too large. -
Avoids deserialization costs in the client for all keys.
-
Avoids transferring the map and all keys through a proxy node (for Extend clients).
To ignore entry processor results, override the processor's processAll
method to return an empty Map
or null
. The following example demonstrates a simple entry processor agent that always returns null
after processing an entry. The example is not very realistic but does show how to override the processAll
method.
public static class Agent implements InvocableMap.EntryProcessor { private static final long serialVersionUID = 1L; @Override public Object process(Entry entry) { return null; } @Override public Map processAll(Set set) { for (Entry entry : (Set<Entry>) set) { process(entry); } return null; }
Performing Synthetic Operations
Entry processors can perform synthetic operations on entries. Both the remove
and setValue
methods for an entry can be declared synthetic by including a true
parameter in the method call. For example:
entry.setValue(value, true)
The setValue
method in a synthetic operation does not return the previous value. In addition, synthetic operations are not propagated to cache stores or binary entry stores; applications are able to commit changes after all processing is complete.
Applications typically use synthetic operations to prepare a cache and perform operations (state changes) that do not require listeners and cache stores to be notified, especially when doing so may be expensive in terms of network, memory, and CPU usage. Applications also use synthetic operations when pre-warming a cache; applications that load cache data may want to avoid having a cache store called, especially if the entries that are being loaded into the cache are coming from the back-end data source.
Applications can differentiate between client driven (natural) events and cache internal (synthetic) events using the isSynthetic
method on the CacheEvent
class. See Using Synthetic Events.
Processing Entries Asynchronously
Entry processors can be invoked asynchronously using the AsynchronousProcessor
class. The class implements the standard Java Future
interface and also includes a Coherence-specific flow control mechanism to guard against excessive backlogs.
Note:
-
Entries can also be processed asynchronously using the
AsyncNameCache<K, V>
interface. See Performing NamedMap Operations Asynchronously. -
This feature is not available on Coherence*Extend clients.
The AsynchronousProcessor
class is used to wrap an entry processor implementation. For example:
UpdaterProcessor up = new UpdaterProcessor(null, value); AsynchronousProcessor ap = new AsynchronousProcessor(up); cache.invokeAll(filter, ap); ap.getResult();
The above example invokes the underlying entry processor and uses automatic flow control (as defined by the underlying service's flow control logic) and a default unit-of-order ID (assigned to the calling thread's hashCode
– as a thread's requests are naturally expected to execute in order). Ordering is guaranteed for each partition even during failover. Additional constructors are available to manually control request flow and assign the unit-of-order ID as required.
For advanced use cases, the AsynchronousProcessor
class can be extended to define custom asynchronous functionality. The following example extends the AsynchrounousProcessor
class and overrides the onResult
, onComplete
and onException
methods.
Note:
Overriding implementations of the onComplete
, onResult
, and onException
methods must be non-blocking and short-lived, because this call is made on the service thread of the client and blocks processing for responses on other threads.
AsynchronousProcessor processor = new AsynchronousProcessor(null) { public synchronized void onResult(Entry entry) { super.onResult(entry); // process the result } public void onComplete() { super.onComplete(); if (m_eReason == null) { // process the result } else { // process the (potentially partial) failure } } public void onException(Throwable eReason) { super.onException(eReason); // process the observed exception } };
Performing Data Grid Aggregation
InvocableMap
interface supports entry aggregators that perform operations against a subset of entries to obtain a single result.Entry aggregation occurs in parallel across the grid to provide map-reduce support when working with large amounts of data. For details, see the aggregate
method.
In addition, the EntryAggregator
interface can be used to processes a set of InvocableMap
.Entry
objects to achieve an aggregated result.
For efficient execution in a data grid, an aggregation process must be designed to operate in a parallel manner. The StreamingAggregator
interface is an advanced extension to the EntryAggregator
interface that is explicitly capable of being run in parallel in a distributed environment.
Note:
The ParallelAwareAggregator
interface has been deprecated and should no longer be used. Applications should use the StreamingAggregator
interface to implement custom aggregators. See Performing Data Grid Aggregation Using Streams.
Coherence includes many natural aggregator functions. The functions include:
-
Count
-
DistinctValues
-
DoubleAverage
-
DoubleMax
-
DoubleMin
-
DoubleSum
-
LongMax
-
LongMin
-
LongSum
See the com.tangosol.util.aggregator
package for a list of Coherence aggregators. To implement your own aggregator, see the StreamingAggregator
interface.
Parent topic: Processing Data In a Cache
Performing Data Grid Aggregation Using Streams
ValueExtractor<Person, Integer> ageExtractor = Person::getAge; double avgAge = cache.stream() .mapToInt(entry -> entry.extract(ageExtractor)) .average() .getAsDouble();
When using Coherence filters, pass a filter object as the source of the stream. For example:
ValueExtractor<Person, Integer> ageExtractor = Person::getAge; int max = personCache.stream(filter) .mapToInt(entry -> entry.extract(ageExtractor)) .max();
As an alternative, use the Coherence Stream
API extension to specify the extractor when creating a stream and rely on the extension for any optimizations. For example:
int max = personCache.stream(filter, Person::getAge) .mapToInt(Number::intValue) .max();
Note that in this case you must use mapToInt(Number::intValue)
to convert Stream<Integer>
into IntStream
. This conversion can also be done internally. For example:
int max = RemoteStream.toIntStream(personCache.stream(filter, Person::getAge)).max();
The Java streams implementation has been extended in Coherence to allow aggregation across the cluster. The API defines a set of aggregators that support streams by implementing the InvocableMap.StreamingAggregator
interface. In addition, Coherence includes many useful collectors that can be executed in parallel in a distributed environment. The collectors are called using the RemoteCollectors
class. For example:
avgAge = cache.stream() .map(Map.Entry::getValue) .collect(RemoteCollectors.averagingInt(Contact::getAge)); System.out.println("\nThe average age of all contacts using collect() is: " + avgAge);
To define custom aggregators that also support streams, you can extend the CollectorAggregator
class.
Parent topic: Processing Data In a Cache
Performing Node-Based Processing
An invocation service is configured using the <invocation-scheme>
element in the cache configuration file. See invocation-scheme
. Using the name of the service, the application can easily obtain a reference to the service:
InvocationService service = (InvocationService)CacheFactory.getService ("MyService");
Agents are simply runnable classes that are part of the application. An example of a simple agent is one designed to request a GC from the JVM:
/** * Agent that issues a garbage collection. */ public class GCAgent extends AbstractInvocable { public void run() { System.gc(); } }
To execute that agent across the entire cluster, it takes one line of code:
service.execute(new GCAgent(), null, null);
Here is an example of an agent that supports a grid-wide request/response model:
/** * Agent that determines how much free memory a grid node has. */ public class FreeMemAgent extends AbstractInvocable { public void run() { Runtime runtime = Runtime.getRuntime(); int cbFree = runtime.freeMemory(); int cbTotal = runtime.totalMemory(); setResult(new int[] {cbFree, cbTotal}); } }
To execute that agent across the entire grid and retrieve all the results from it only requires a single line of code:
Map map = service.query(new FreeMemAgent(), null);
While it is easy to do a grid-wide request/response, it takes a bit more code to print the results:
Iterator iter = map.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); Member member = (Member) entry.getKey(); int[] anInfo = (int[]) entry.getValue(); if (anInfo != null) // nullif member died System.out.println("Member " + member + " has " + anInfo[0] + " bytes free out of " + anInfo[1] + " bytes total"); }
The agent operations can be stateful, which means that their invocation state is serialized and transmitted to the grid nodes on which the agent is to be run.
/** * Agent that carries some state with it. */ public class StatefulAgent extends AbstractInvocable { public StatefulAgent(String sKey) { m_sKey = sKey; } public void run() { // the agent has the key that it was constructed with String sKey = m_sKey; // ... } private String m_sKey; }
Parent topic: Processing Data In a Cache