Using NoSQLSubscriptionConfig

You configure your subscription by building an oracle.kv.pubsub.NoSQLSubscriptionConfig object. You then provide this object to your NoSQLSubscriber implementation, and also implement NoSQLSubscriber.getSubscriptionConfig() to return this object when it is called. When you construct the publisher, you will provide it with your NoSQLSubscriber implementation, and the publisher will then call NoSQLSubscriber.getSubscriptionConfig() in order to understand how to create the subscription. See Implementing Subscribers and Using a Streams Publisher.

Configuring a single subscriber:

To build your NoSQLSubscriptionConfig object, you use NoSQLSubscriptionConfig.Builder as follows:
final NoSQLSubscriptionConfig subConfig =
	new NoSQLSubscriptionConfig.Builder("ChkptTable")
    .setSubscribedTables("UserTable")
    .setStreamMode(NoSQLStreamMode.FROM_NOW)
    .build(); 

Configuring multiple sharded subscribers as part of a subscriber group:

To build a NoSQLSubscriptionConfig object for a sharded subscriber of a subscriber group, you use a constructor of the NoSQLSubscriptionConfig.Builder class that has a single parameter of a mapper function. This function returns a distinct checkpoint table name computed from each subscriber id in the group. A sample constructor of the Builder class for configuring sharded subscribers is shown below.
public void Builder(Function<NoSQLSubscriberId, String> ckptTableMapper);
You can build the NoSQLSubscriptionConfig object for a sharded subscriber as shown below:
/** * Configures a sharded subscriber
* @param subscriberId subscriber id  
* @param tables subscribed tables 
*/
void configureShardedSubscribers(NoSQLSubscriberId subscriberId, Set<String> tables) {
   finalNoSQLSubscriptionConfig conf = 
     newNoSQLSubscriptionConfig.Builder( id -> buildCkptTableName(id))
                                          .setSubscribedTables(tables)
                                          .setSubscriberId(subscriberId)
                                          .build();
}

An example code snippet is shown below where the buildCkptTableName builds the checkpoint name from the subscriber id by concatenating the subscriber id to StreamCheckpointTable. This is one way of using the mapper function. Your application can create its own mapper function, as long as it returns unique checkpoint table name for each subscriber.

The mapping has to be static throughout the lifetime of the XRegion Agent group, not for a specific data store instance. That is, for a data store, you can create a group of 3 agents with a mapping function. Later on, you can shut the agents down and create another group of agents with another mapping function. These two groups use different set of checkpoint tables and thus there will be no conflict.
/**
 * Builds stream checkpoint table from given subscriber id
 * @param subscriberId subscriber id
 * @return stream checkpoint table name for that subscriber
 */
private String buildCkptTableName(NoSQLSubscriberId subscriberId) {
    return "StreamCheckpointTable" + subscriberId.toString();
}
This configuration causes the subscription to:
  • Use the checkpoint table called ChkptTable. For more information about checkpoint table, see Using Checkpoints. Be aware that the table name used here is chosen by you, and should be unique to your subscription. If you are using multiple subscriptions, then each subscription should use a unique name for the checkpoint table. This table is created automatically.

    If you are using a secure store, you need read/write access to the checkpoint table. If a checkpoint table does not exist, you also need the CREATE TABLE privilege. For information about:
  • Subscribe to all write activity performed on the user table called UserTable. Subscriptions can be created for user-defined tables; updates to system tables would not be streamed. You can use this to subscribe to multiple tables:

    new NoSQLSubscriptionConfig.Builder("ChkptTable")
     .setSubscribedTables("UserTable", "PriceTable", "InventoryTable") .... 

    If you do not call setSubscribedTables(), then the subscription will subscribe to all tables in the store. If a subscription is for every table in the store, and a new a table is created after subscription is established (using the DDL CREATE TABLE operation), the stream will include all put events for every row created in the new table.

  • Set the stream mode to NoSQLStreamMode.FROM_NOW. The stream mode indicates from where in the stream the Publisher will start retrieving events. For more information, see NoSQLStreamMode.

Once you have created your subscription configuration, you provide it to your NoSQLSubscriber implementation, which then must make it available via the NoSQLSubscriber.getSubscriptionConfig() method:

class mySubscriber implements NoSQLSubscriber {
    ...
    private final NoSQLSubscriptionConfig config;
    ...
    // Generally the constructor will require more than just
    // the subscription configuration. The point here is that you
    // must somehow provide the configuration object to
    // your subscriber implemention because that is how
    // your publisher will get it.
    mySubscriber(NoSQLSubscriptionConfig config, ....) {
        ...
        this.config = config;
        ...
    }
    @Override
    public NoSQLSubscriptionConfig getSubscriptionConfig() {
        return config;
    }  
    ...

When you implement your streams application, you will use your subscriber implementation. The getSubscriptionConfig() method on the subscriber is how your publisher finds out what tables to follow, and so forth. See Using a Streams Publisher.

The expiration time for an empty stream can be specified using the NoSQLSubscriptionConfig.setEmptyStreamDuration() method. The expiration time will begin only when a stream becomes empty after which the publisher would shut down the empty stream. The default empty stream expiration time is 60 seconds. The user can override the default empty stream expiration time by the setEmptyStreamDuration() method.

In this section, we have shown only a few options that you can set using NoSQLSubscriptionConfig. For a complete list of configuration options, see NoSQLSubscriptionConfig and NoSQLSubscriptionConfig.Builder in the Java Direct Driver API Reference.