Scaling Subscribers

To add or remove subscribers running on different nodes, NoSQLSubscriptionConfig has to be created with the following additional builder API.
/* step 3: create a subscription configuration */
  final NoSQLSubscriptionConfig subscriptionConfig =
    // Scalable subscriber should set Subscriber Id
    // with 2 as total number of subscribers and
    // 0 as its own SubscriberId within the group of 2 subscribers

    new NoSQLSubscriptionConfig.Builder(CKPT_TABLE_NAME)
    .setSubscribedTables("usertable")
    .setSubscriberId(new NoSQLSubscriberId(2,0))
    .setStreamMode(streamMode)
    .build();

The API setSubscriberId() takes a single argument NoSQLSubscriberID. NoSQLSubscriberId is an object with both total number of subscribers and subscriber index. Hence, we need the following two arguments to construct a NoSQLubscriberId object.

  • Number of Subscribers

    The total number of subscribers that would be running on different nodes. For example, in the code example above, .setSubscriberId(new NoSQLSubscriberId(2,0)), the NoSQLSubscriberId created has two subscribers in total.

  • Subscriber Index

    A numerical index of the current subscriber among the total number of subscribers. Note that a numerical index begins with 0. For example, two subscriber clients can be identified as 0 and 1.

Configure multiple sharded subscribers as part of a subscriber group

To build a NoSQLSubscriptionConfig object for a sharded subscriber of a subscriber group, you use a constructor of the NoSQLSubscriptionConfig.Builder class that has a single parameter of a mapper function. This function returns a distinct checkpoint table name computed from each subscriber id in the group. A sample constructor of the Builder class for configuring sharded subscribers is shown below.
public void Builder(Function<NoSQLSubscriberId, String> ckptTableMapper);
You can build the NoSQLSubscriptionConfig object for a sharded subscriber as shown below:
/** * Configures a sharded subscriber
* @param subscriberId subscriber id  
* @param tables subscribed tables 
*/
void configureShardedSubscribers(NoSQLSubscriberId subscriberId, Set<String> tables) {
   finalNoSQLSubscriptionConfig conf = 
     newNoSQLSubscriptionConfig.Builder( id -> buildCkptTableName(id))
                                          .setSubscribedTables(tables)
                                          .setSubscriberId(subscriberId)
                                          .build();
}
An example code snippet is shown below where the buildCkptTableName builds the checkpoint name from the subscriber id by concatenating the subscriber id to StreamCheckpointTable. This is one way of using the mapper function. Your application can create its own mapper function, as long as it returns unique checkpoint table name for each subscriber.
/**
 * Builds stream checkpoint table from given subscriber id
 * @param subscriberId subscriber id
 * @return stream checkpoint table name for that subscriber
 */
private String buildCkptTableName(NoSQLSubscriberId subscriberId) {
    return "StreamCheckpointTable" + subscriberId.toString();
}