Using NoSQLSubscriptionConfig
You configure your subscription by building an oracle.kv.pubsub.NoSQLSubscriptionConfig
object. You then provide this object to your NoSQLSubscriber
implementation, and also implement NoSQLSubscriber.getSubscriptionConfig()
to return this object when it is called. When you construct the publisher, you will provide it with your NoSQLSubscriber
implementation, and the publisher will then call NoSQLSubscriber.getSubscriptionConfig()
in order to understand how to create the subscription. See Implementing Subscribers and Using a Streams Publisher.
Configuring a single subscriber:
NoSQLSubscriptionConfig
object, you use
NoSQLSubscriptionConfig.Builder
as follows:
final NoSQLSubscriptionConfig subConfig =
new NoSQLSubscriptionConfig.Builder("ChkptTable")
.setSubscribedTables("UserTable")
.setStreamMode(NoSQLStreamMode.FROM_NOW)
.build();
Configuring multiple sharded subscribers as part of a subscriber group:
NoSQLSubscriptionConfig
object for a sharded subscriber
of a subscriber group, you use a constructor of the
NoSQLSubscriptionConfig.Builder
class that has a single
parameter of a mapper function. This function returns a distinct checkpoint table
name computed from each subscriber id in the group. A sample constructor of the
Builder class for configuring sharded subscribers is shown
below.public void Builder(Function<NoSQLSubscriberId, String> ckptTableMapper);
You
can build the NoSQLSubscriptionConfig
object for a sharded
subscriber as shown
below:/** * Configures a sharded subscriber
* @param subscriberId subscriber id
* @param tables subscribed tables
*/
void configureShardedSubscribers(NoSQLSubscriberId subscriberId, Set<String> tables) {
finalNoSQLSubscriptionConfig conf =
newNoSQLSubscriptionConfig.Builder( id -> buildCkptTableName(id))
.setSubscribedTables(tables)
.setSubscriberId(subscriberId)
.build();
}
An example code snippet is shown below where the buildCkptTableName
builds the checkpoint name from the subscriber id by concatenating the subscriber id
to StreamCheckpointTable. This is one way of using the mapper function. Your
application can create its own mapper function, as long as it returns unique
checkpoint table name for each subscriber.
/**
* Builds stream checkpoint table from given subscriber id
* @param subscriberId subscriber id
* @return stream checkpoint table name for that subscriber
*/
private String buildCkptTableName(NoSQLSubscriberId subscriberId) {
return "StreamCheckpointTable" + subscriberId.toString();
}
-
Use the checkpoint table called
ChkptTable
. For more information about checkpoint table, see Using Checkpoints. Be aware that the table name used here is chosen by you, and should be unique to your subscription. If you are using multiple subscriptions, then each subscription should use a unique name for the checkpoint table. This table is created automatically.If you are using a secure store, you need read/write access to the checkpoint table. If a checkpoint table does not exist, you also need theCREATE TABLE
privilege. For information about:-
Connecting to a secure store, see Authenticating to a Secure Store.
-
Configuring Authorization for a secure store, see Privileges in the Security Guide.
-
-
Subscribe to all
write
activity performed on the user table calledUserTable
. Subscriptions can be created for user-defined tables; updates to system tables would not be streamed. You can use this to subscribe to multiple tables:new NoSQLSubscriptionConfig.Builder("ChkptTable") .setSubscribedTables("UserTable", "PriceTable", "InventoryTable") ....
If you do not call
setSubscribedTables()
, then the subscription will subscribe to all tables in the store. If a subscription is for every table in the store, and a new a table is created after subscription is established (using theDDL CREATE TABLE
operation), the stream will include allput
events for every row created in the new table. -
Set the stream mode to
NoSQLStreamMode.FROM_NOW
. The stream mode indicates from where in the stream the Publisher will start retrieving events. For more information, see NoSQLStreamMode.
Once you have created your subscription configuration, you provide it to your NoSQLSubscriber
implementation, which then must make it available via the NoSQLSubscriber.getSubscriptionConfig()
method:
class mySubscriber implements NoSQLSubscriber {
...
private final NoSQLSubscriptionConfig config;
...
// Generally the constructor will require more than just
// the subscription configuration. The point here is that you
// must somehow provide the configuration object to
// your subscriber implemention because that is how
// your publisher will get it.
mySubscriber(NoSQLSubscriptionConfig config, ....) {
...
this.config = config;
...
}
@Override
public NoSQLSubscriptionConfig getSubscriptionConfig() {
return config;
}
...
When you implement your streams application, you will use your subscriber implementation. The getSubscriptionConfig()
method on the subscriber is how your publisher finds out what tables to follow, and so forth. See Using a Streams Publisher.
The expiration time for an empty stream can be specified using the NoSQLSubscriptionConfig.setEmptyStreamDuration()
method. The expiration time will begin only when a stream becomes empty after which the publisher would shut down the empty stream. The default empty stream expiration time is 60 seconds. The user can override the default empty stream expiration time by the setEmptyStreamDuration()
method.
In this section, we have shown only a few options that you can set using NoSQLSubscriptionConfig
. For a complete list of configuration options, see NoSQLSubscriptionConfig and NoSQLSubscriptionConfig.Builder in the Java Direct Driver API Reference.