31 Using Map Events
ObservableMap
interface.This chapter includes the following sections:
- Overview of Map Events
Coherence provides cache events using the JavaBean Event model. - Signing Up for All Events
To sign up for events, pass an object that implements theMapListener
interface to anaddMapListener
method on theObservableMap
interface. - Using an Inner Class as a MapListener
You can use theAbstractMapListener
base class when creating an inner class to use as aMapListener
or when implementing aMapListener
that only listens to one or two types of events (inserts, updates or deletes). - Using Lambda Expressions to Add Map Listeners
A lambda expression can be used to addMapListener<K, V>
implementations. - Configuring a MapListener For a Cache
If a listener should always be on a particular cache, then place it into the cache configuration using the<listener>
element and the listener is automatically added to the cache. - Signing Up For Events On Specific Identities
You can sign up for events that occur against specific identities (keys). - Filtering Events
You can use filters to listen to particular events. - Using Lite Events
You can use lite events if an event should only include new values. - Listening to Queries
The same filters that are used to query a cache can listen to events from a cache. - Using Synthetic Events
You can choose to monitor synthetic events. - Listening to Backing Map Events
You can listen to events for the map that backs a cache (partitioned, replicated, near, continuously-query, read-through/write-through and write-behind). - Using Synchronous Event Listeners
To guarantee that cache API operations and the events are ordered as if the local view of the clustered system were single-threaded, aMapListener
must implement theSynchronousListener
marker interface. - Using Durable Events (Experimental)
Coherence has provided the ability for clients to asynchronously observe data changes for almost two decades. This has proven to be an incredibly powerful feature allowing Coherence to offer components such as NearCaches and ViewCaches/CQCs, in addition to allowing customers to build truly event-driven systems.
Parent topic: Performing Data Grid Operations
Overview of Map Events
Note:
Coherence also includes the live event programming model. Live events provide support for common event types and can be used instead of map events. See Using Live Events.
This section includes the following topics:
- Listener Interface and Event Object
- Understanding Event Guarantees
- Caches and Classes that Support Events
Parent topic: Using Map Events
Listener Interface and Event Object
In the JavaBeans Event model, there is an EventListener
interface that all listeners must extend. Coherence provides a MapListener
interface, which allows application logic to receive events when data in a Coherence cache is added, modified or removed.
An application object that implements the MapListener
interface can sign up for events from any Coherence cache or class that implements the ObservableMap
interface, simply by passing an instance of the application's MapListener
implementation to a addMapListener()
method.
The MapEvent
object that is passed to the MapListener
carries all of the necessary information about the event that has occurred, including the source (ObservableMap
) that raised the event, the identity (key) that the event is related to, what the action was against that identity (insert, update or delete), what the old value was and what the new value is.
Parent topic: Overview of Map Events
Understanding Event Guarantees
The partitioned cache service guarantees that under normal circumstances an event is delivered only once. However, there are two scenarios that could break this guarantee:
-
A catastrophic cluster failure that caused the data loss (for example, simultaneous crash of two machines holding data). In this case, the
PARTITION_LOST
event is emitted to all registeredPartitionListener
instances on the server side. -
Client disconnect. In this case, the
MEMBER_LEFT
event is emitted to all registeredMemberListener
instances on the client side.
Parent topic: Overview of Map Events
Caches and Classes that Support Events
All Coherence caches implement ObservableMap
; in fact, the NamedCache
interface that is implemented by all Coherence caches extends the ObservableMap
interface. That means that an application can sign up to receive events from any cache, regardless of whether that cache is local, partitioned, near, replicated, using read-through, write-through, write-behind, overflow, disk storage, and so on.
Note:
Regardless of the cache topology and the number of servers, and even if the modifications are being made by other servers, the events are delivered to the application's listeners.
In addition to the Coherence caches (those objects obtained through a Coherence cache factory), several other supporting classes in Coherence also implement the ObservableMap
interface:
-
ObservableHashMap
-
LocalCache
-
OverflowMap
-
NearCache
-
ReadWriteBackingMap
-
AbstractSerializationCache
,SerializationCache
, andSerializationPagedCache
-
WrapperObservableMap
,WrapperConcurrentMap
, andWrapperNamedCache
Parent topic: Overview of Map Events
Signing Up for All Events
MapListener
interface to an addMapListener
method on the ObservableMap
interface.The following example illustrates a sample MapListener
implementation that prints each event it receives.
/** * A MapListener implementation that prints each event as it receives * them. */ public static class EventPrinter extends Base implements MapListener { public void entryInserted(MapEvent evt) { out(evt); } public void entryUpdated(MapEvent evt) { out(evt); } public void entryDeleted(MapEvent evt) { out(evt); } }
Using this implementation, you can print all events from any given cache (since all caches implement the ObservableMap
interface):
cache.addMapListener(new EventPrinter());
To be able to later remove the listener, it is necessary to hold on to a reference to the listener:
Listener listener = new EventPrinter(); cache.addMapListener(listener); m_listener = listener; // store the listener in a field
The listener can then be removed:
Listener listener = m_listener; if (listener != null) { cache.removeMapListener(listener); m_listener = null; // clean up the listener field }
Each addMapListener
method on the ObservableMap
interface has a corresponding removeMapListener
method. To remove a listener, use the removeMapListener
method that corresponds to the addMapListener
method that was used to add the listener.
Parent topic: Using Map Events
Using an Inner Class as a MapListener
AbstractMapListener
base class when creating an inner class to use as a MapListener
or when implementing a MapListener
that only listens to one or two types of events (inserts, updates or deletes).The following example, prints out only the insert events for the cache.
cache.addMapListener(new AbstractMapListener() { public void entryInserted(MapEvent evt) { out(evt); } });
Another helpful base class for creating a MapListener
implementation is the MultiplexingMapListener
, which routes all events to a single method for handling. Since only one method must be implemented to capture all events, the MultiplexingMapListener
can also be very useful when creating an inner class to use as a MapListener
:
public static class EventPrinter extends MultiplexingMapListener { public void onMapEvent(MapEvent evt) { out(evt); } }
Parent topic: Using Map Events
Using Lambda Expressions to Add Map Listeners
MapListener<K, V>
implementations.The following example uses a lambda expression to add the SimpleMapListener<K, V>
implementation that is delivered with Coherence. The implementation delegates to an appropriate event handler based on the event type.
MapListener<ContactId, Contact> listener = new SimpleMapListener<ContactId, Contact>().addInsertHandler((event) -> System.out.println("\ninserted:\n" + event.getNewValue())); cache.addMapListener(listener);
Parent topic: Using Map Events
Configuring a MapListener For a Cache
<listener>
element and the listener is automatically added to the cache.See listener
.
Parent topic: Using Map Events
Signing Up For Events On Specific Identities
5
:
cache.addMapListener(new EventPrinter(), new Integer(5), false);
Thus, the following code would only trigger an event when the Integer key 5
is inserted or updated:
for (int i = 0; i < 10; ++i) { Integer key = new Integer(i); String value = "test value for key " + i; cache.put(key, value); }
Parent topic: Using Map Events
Filtering Events
// Filters used with partitioned caches must be // Serializable, Externalizable or ExternalizableLite public class DeletedFilter implements Filter, Serializable { public boolean evaluate(Object o) { MapEvent evt = (MapEvent) o; return evt.getId() == MapEvent.ENTRY_DELETED; } } cache.addMapListener(new EventPrinter(), new DeletedFilter(), false);
Note:
Filtering events versus filtering cached data:
When building a filter for querying, the object that is passed to the evaluate method of the Filter
is a value from the cache, or - if the filter implements the EntryFilter
interface - the entire Map.Entry
from the cache. When building a filter for filtering events for a MapListener
, the object that is passed to the evaluate method of the filter is of type MapEvent
. See Listening to Queries.
If you then make the following sequence of calls:
cache.put("hello", "world"); cache.put("hello", "again"); cache.remove("hello");
The result would be:
CacheEvent{LocalCache deleted: key=hello, value=again}
Parent topic: Using Map Events
Using Lite Events
MapListener listener = new MultiplexingMapListener() { public void onMapEvent(MapEvent evt) { out("event has occurred: " + evt); out("(the wire-size of the event would have been " + ExternalizableHelper.toBinary(evt).length() + " bytes.)"); } }; cache.addMapListener(listener); // insert a 1KB value cache.put("test", new byte[1024]); // update with a 2KB value cache.put("test", new byte[2048]); // remove the 2KB value cache.remove("test");
The output from running the test shows that the first event carries the 1KB inserted value, the second event carries both the replaced 1KB value and the new 2KB value, and the third event carries the removed 2KB value:
event has occurred: CacheEvent{LocalCache added: key=test, value=[B@a470b8} (the wire-size of the event would have been 1283 bytes.) event has occurred: CacheEvent{LocalCache updated: key=test, old value=[B@a470b8, new value=[B@1c6f579} (the wire-size of the event would have been 3340 bytes.) event has occurred: CacheEvent{LocalCache deleted: key=test, value=[B@1c6f579} (the wire-size of the event would have been 2307 bytes.)
When an application does not require the old and the new value to be included in the event, it can indicate that by requesting only "lite" events. When adding a listener, you can request lite events by using a addMapListener
method that takes an additional boolean fLite
parameter:
cache.addMapListener(listener, (Filter) null, true);
Note:
Obviously, a lite event's old value and new value may be null. However, even if you request lite events, the old and the new value may be included if there is no additional cost to generate and deliver the event. In other words, requesting that a MapListener
receive lite events is simply a hint to the system that the MapListener
does not have to know the old and new values for the event.
Parent topic: Using Map Events
Listening to Queries
This section includes the following topics:
Parent topic: Using Map Events
Overview of Listening to Queries
All Coherence caches support querying by any criteria. When an application queries for data from a cache, the result is a point-in-time snapshot, either as a set of identities (keySet
) or a set of identity/value pairs (entrySet
). The mechanism for determining the contents of the resulting set is referred to as filtering, and it allows an application developer to construct queries of arbitrary complexity using a rich set of out-of-the-box filters (for example, equals, less-than, like, between, and so on), or to provide their own custom filters (for example, XPath
).
For example, in a trading system it is possible to query for all open Order
objects for a particular trader:
NamedCache mapTrades = ... Filter filter = new AndFilter(new EqualsFilter("getTrader", traderid), new EqualsFilter("getStatus", Status.OPEN)); Set setOpenTrades = mapTrades.entrySet(filter);
To receive notifications of new trades being opened for that trader, closed by that trader or reassigned to or from another trader, the application can use the same filter:
// receive events for all trade IDs that this trader is interested in mapTrades.addMapListener(listener, new MapEventFilter(filter), true);
The MapEventFilter
converts a query filter into an event filter.
The MapEventFilter
has several very powerful options, allowing an application listener to receive only the events that it is specifically interested in. More importantly for scalability and performance, only the desired events have to be communicated over the network, and they are communicated only to the servers and clients that have expressed interest in those specific events. For example:
// receive all events for all trades that this trader is interested in nMask = MapEventFilter.E_ALL; mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true); // receive events for all this trader's trades that are closed or // re-assigned to a different trader nMask = MapEventFilter.E_UPDATED_LEFT | MapEventFilter.E_DELETED; mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true); // receive events for all trades as they are assigned to this trader nMask = MapEventFilter.E_INSERTED | MapEventFilter.E_UPDATED_ENTERED; mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true); // receive events only for new trades assigned to this trader nMask = MapEventFilter.E_INSERTED; mapTrades.addMapListener(listener, new MapEventFilter(nMask, filter), true);
Parent topic: Listening to Queries
Filtering Events Versus Filtering Cached Data
When building a Filter
for querying, the object that is passed to the evaluate method of the Filter
is a value from the cache, or if the Filter
implements the EntryFilter
interface, the entire Map.Entry
from the cache. When building a Filter
for filtering events for a MapListener
, the object that is passed to the evaluate method of the Filter
is of type MapEvent
.
The MapEventFilter
converts a Filter
that is used to do a query into a Filter
that is used to filter events for a MapListener
. In other words, the MapEventFilter
is constructed from a Filter
that queries a cache, and the resulting MapEventFilter
is a filter that evaluates MapEvent
objects by converting them into the objects that a query Filter would expect.
Parent topic: Listening to Queries
Using Synthetic Events
Some events originate from within a cache itself. There are many examples, but the most common cases are:
-
When entries automatically expire from a cache;
-
When entries are evicted from a cache because the maximum size of the cache has been reached;
-
When entries are transparently added to a cache as the result of a Read-Through operation;
-
When entries in a cache are transparently updated as the result of a Read-Ahead or Refresh-Ahead operation.
Each of these represents a modification, but the modifications represent natural (and typically automatic) operations from within a cache. These events are referred to as synthetic events.
When necessary, an application can differentiate between client-induced and synthetic events simply by asking the event if it is synthetic. This information is carried on a sub-class of the MapEvent
, called CacheEvent
. Using the previous EventPrinter
example, it is possible to print only the synthetic events:
public static class EventPrinter extends MultiplexingMapListener { public void onMapEvent(MapEvent evt) { if (evt instanceof CacheEvent && ((CacheEvent) evt).isSynthetic()) { out(evt); ) } }
Note:
Not all cache service types support the dispatching of synthetic events. Synthetic events will only be dispatched by a partitioned cache service and its derivatives, such as a federated cache service, or by near, view, or remote caches that are backed by a cache service that supports the dispatching of synthetic events. In all other cases, no event will be dispatched for synthetic events such as expiry.
Parent topic: Using Map Events
Listening to Backing Map Events
This section includes the following topics:
- Overview of Listening to Backing Map Events
- Producing Readable Backing MapListener Events from Distributed Caches
Parent topic: Using Map Events
Overview of Listening to Backing Map Events
For some advanced use cases, it may be necessary to "listen to" the "map" behind the "service". Replication, partitioning and other approaches to managing data in a distributed environment are all distribution services. The service still has to have something in which to actually manage the data, and that something is called a "backing map".
Backing maps can be configured. If all the data for a particular cache should be kept in object form on the heap, then use an unlimited and non-expiring LocalCache
(or a SafeHashMap
if statistics are not required). If only a small number of items should be kept in memory, use a LocalCache
. If data are to be read on demand from a database, then use a ReadWriteBackingMap
(which knows how to read and write through an application's DAO implementation), and in turn give the ReadWriteBackingMap
a backing map such as a SafeHashMap
or a LocalCache
to store its data in.
Some backing maps are observable. The events coming from these backing maps are not usually of direct interest to the application. Instead, Coherence translates them into actions that must be taken (by Coherence) to keep data synchronous and properly backed up, and it also translates them when appropriate into clustered events that are delivered throughout the cluster as requested by application listeners. For example, if a partitioned cache has a LocalCache
as its backing map, and the local cache expires an entry, that event causes Coherence to expire all of the backup copies of that entry. Furthermore, if any listeners have been registered on the partitioned cache, and if the event matches their event filter(s), then that event is delivered to those listeners on the servers where those listeners were registered.
In some advanced use cases, an application must process events on the server where the data are being maintained, and it must do so on the structure (backing map) that is actually managing the data. In these cases, if the backing map is an observable map, a listener can be configured on the backing map or one can be programmatically added to the backing map. (If the backing map is not observable, it can be made observable by wrapping it in an WrapperObservableMap
.)
Each backing map event is dispatched once and only once. However, multiple backing map events could be generated from a single put
. For example, if the entry from put
has to be redistributed, then distributed events (deleted from original node, and inserted in a new node) are created. In this case, the backing map listener is called multiple times for the single put
.
Lastly, backing map listeners are always synchronous; they are fired on a thread that is doing the modification operation while holding the synchronization monitor for the backing map itself. Often times for internal backing map listeners, events are not processed immediately, but are queued and processed later asynchronously.
Parent topic: Listening to Backing Map Events
Producing Readable Backing MapListener Events from Distributed Caches
Backing MapListener
events are returned from replicated caches in
readable Java format. However, backing MapListener
events returned from
distributed caches are in internal Coherence format. The Coherence Incubator Common
project provides an AbstractMultiplexingBackingMapListener
class that
enables you to obtain readable backing MapListener
events from
distributed caches. See https://coherence.java.net/ to download the Coherence Common
libraries.
To produce readable backing MapListener
events from distributed caches:
The AbstractMultiplexingBackingMapListener
class provides an onBackingMapEvent
method which you can override to specify how you would like the event returned.
The following listing of the VerboseBackingMapListener
class is a sample implementation of AbstractMultiplexingBackingMapListener
. The onBackingMapEvent
method has been overridden to send the results to standard output.
import com.tangosol.net.BackingMapManagerContext; import com.tangosol.util.MapEvent; public class VerboseBackingMapListener extends AbstractMultiplexingBackingMapListener { public VerboseBackingMapListener(BackingMapManagerContext context) { super(context); } @Override protected void onBackingMapEvent(MapEvent mapEvent, Cause cause) { System.out.printf("Thread: %s Cause: %s Event: %s\n", Thread.currentThread().getName(), cause, mapEvent); } }
The following example demonstrates setting the <listener>
element in a distributed cache scheme and identifies the VerboseBackingMapListener
implementation as being of type com.tangosol.net.BackingMapManagerContext
.
<distributed-scheme> <scheme-name>my-dist-scheme</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <read-write-backing-map-scheme> <internal-cache-scheme> <local-scheme> <high-units>0</high-units> <expiry-delay>0</expiry-delay> </local-scheme> </internal-cache-scheme> <cachestore-scheme> <class-scheme> <class-name>CustomCacheStore</class-name> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value>{cache-name}</param-value> </init-param> </init-params> </class-scheme> </cachestore-scheme> <listener> <class-scheme> <class-name>VerboseBackingMapListener</class-name> <init-params> <init-param> <param-type>com.tangosol.net.BackingMapManagerContext </param-type> <param-value>{manager-context}</param-value> </init-param> </init-params> </class-scheme> </listener> </read-write-backing-map-scheme> </backing-map-scheme> <autostart>true</autostart> </distributed-scheme>
Parent topic: Listening to Backing Map Events
Using Synchronous Event Listeners
MapListener
must implement the SynchronousListener
marker interface.One example in Coherence itself that uses synchronous listeners is the near cache, which can use events to invalidate locally cached data. That is, on a put operation into a near cache, the local copy and the distributed primary and backup copies of the entry are updated as a synchronous operation. Once this update is complete, asynchronous events are sent to all the other listening near caches. This invalidates the local copies so that the entries are retrieved from the back cache on the next get operation.
Parent topic: Using Map Events
Using Durable Events (Experimental)
For a comprehensive overview of MapEvents in terms of the call back interface, in addition to the registration mechanisms, see Using Map Events. However, it is worth drawing attention to some of the guarantees offered by MapEvents to provide context of why the Durable Events feature is useful.
Parent topic: Using Map Events
Guaranteeing a MapEvent
Coherence guarantees that a MapEvent
, which represents a change to
an entry in the source map, will be delivered exactly once, given the client remains a
member of the associated service.
Note:
EachNamedMap
is associated with a Service
and
typically this service is a PartitionedService
. Therefore, it provides
distributed storage and partitioned/sharded access. The remaining description of
MapEvent
guarantees will assume the use of a
PartitionedService
and therefore providing resilience to process,
machine, rack, or site failure.
Importantly, this guarantee is maintained regardless of failures in the system that the
services are configured to handle. Therefore, a backup-count
of '1'
results in the service being able to tolerate the loss of a single unit where unit can
be a node (JVM/member), machine, rack, or site. Upon encountering a fault, Coherence
restores data and continues service for the affected partitions. This data redundancy is
extended to MapEvent delivery to clients. Therefore, if a member hosting primary
partitions was to die, and the said member had sent the backup message for some change
but failed to deliver the MapEvent to the client, the new primary member (that was a
backup member and went through the automatic promotion protocol) would emit MapEvent
messages that had not been confirmed by clients. The client is aware of MapEvent
messages it had already processed, and therefore if the MapEvent was received by the
client but the primary did not receive the ACK, causing the backup to send a duplicate,
the client will not replay the event.
The aforementioned guarantee of MapEvent delivery are all valid under the assumption that the member that has registered for MapEvents, does not leave the service. If the member leaves the associated service, then these guarantees no longer apply. This can be problematic under a few contexts (described in the sections below) and has led to the need deliver a more general feature of event replay
This section includes the following topics:
Parent topic: Using Durable Events (Experimental)
Abnormal Service Termination
While it is rare, there are some scenarios that result in abnormal service
termination. The abnormal termination causes references to Services
or
NamedMaps
to become invalid, and therefore unusable. Instead of
having numerous applications and internal call sites be defensive to these invalid
handles, Coherence chose to introduce a 'safe' layer between the call site and the
raw/internal service. The 'safe' layer remains valid when services abnormally terminate,
and additionally may cause the underlying service to restart.
In the case of a 'safe' NamedMap
, any MapEvent
listeners registered will automatically be re-registered. However, any events that had
occurred after the member left the service and before it re-joined will be lost, or more
formally, not observed by this member’s listener. This has been worked around in many
cases by these members/clients re-synchronizing their local state with the remote data;
this is the approach taken for ContinuousQueryCaches
.
Parent topic: Guaranteeing a MapEvent
Extending Proxy Failover
Coherence Extend provides a means for a client to connect to a cluster through a conduit referred to as a proxy. An extend client wraps up the intended request and forwards to a proxy which executes the said request with the results either streamed back to the client or sent as a single response. There are many reasons for using 'extend' over being a member of the cluster. For more information about extend, see Introduction to Coherence*Extend in Developing Remote Clients for Oracle Coherence.
The liveness of the proxy is important to the extend clients that are connected to it. If the proxy becomes unresponsive or dies, the extend client transparently reconnects to another proxy. Similar to Abnormal Service Termination, there is a potential of not observing MapEvents that had occurred at the source due to the proxy leaving the associated service or the extend client reconnecting to a different proxy, and therefore re-registering the listener.
There are means to work around this situation by observing the proxy disconnect/re-connect and causing a re-synchronization of extend client and the source. However, extend proxy failover is a significantly more likely event to occur.
Parent topic: Guaranteeing a MapEvent
Abnormal Process Termination
A logical client receiving MapEvents may experience a process restart and it may be desirable to continue receiving MapEvents after the last received event, instead of receiving events only after registration. For example, a client may be responsible for updating some auxiliary system by applying the contents of each MapEvent to that system. Additionally, the client may be tracking the last received events, and therefore can inform the source of the last event it received. Thus, a capable source can replay all the events that were missed.
Parent topic: Guaranteeing a MapEvent
Replaying Generic Events
- Storage Nodes
- Track by storing the
MapEvents
as they are generated. - Expose a monotonically increasing version per partition within a single cache.
- Ensure that version semantics are maintained regardless of faults.
- Ensure that
MapEvent
storage is redundant. - Provide a
MapEvent
storage retention policy.
- Track by storing the
- Client Nodes
- Have a trivial facility to suggest received
MapEvent
versions are tracked and therefore events replayed when faced with a restart. - Provide a more advanced means such that the client can control the tracking of event versions.
- Have a trivial facility to suggest received
With the above features, a MapListener
can opt in by suggesting they
are version aware and Coherence will automatically start tracking versions on the
client. This does require a complementing server-side configuration in which the
storage servers are tracking MapEvents
. For example:
NamedMap.addMapListener
or indirectly due to a service restart.
These versions are sent to the relevant storage servers and if a version is returned
for a partition, Coherence will return all known versions later than or equal to the
specified version. Additionally, certain formal constants are defined to allow a
client to request storage servers to send:
- all known events
- current head and all future events (previously known as priming events)
- all future events (the current behavior)
Note:
There is a natural harmony between the registration mechanism and the partitions returned fromVersionedPartitions
that occurs and is worth noting. For
example, when registering a MapListener
against a specific key,
only MapEvents
for the said key will be received by this
MapListener
, and therefore only versions for the associated
partition will be tracked. The VersionedPartitions
returned by this
VersionAwareMapListener
will only return a version for a single
partition. However, this is worth noting if you implement your own
VersionAwareMapListener
or VersionedPartitions
data structure.
Parent topic: Using Durable Events (Experimental)
Availability in Production
The Durable Events feature is not production ready and Oracle does not recommend using it in production at this point. There are some features that are required prior to this feature graduating to a production ready status. However, Oracle is making this feature available in its current form to garner feedback prior to locking down the APIs and semantics.
- Redundant MapEvent storage
- MapEvent retention policy
- MapEvent delivery flow control
- Extend and gRPC client support
- Snapshots of MapEvent storage
- Allow a logical client to store its
VersionedPartitions
in Coherence - Monitoring metrics
To get started with using this feature in its current form, see Durable Events.
Parent topic: Using Durable Events (Experimental)