Scaling Subscribers
NoSQLSubscriptionConfig
has to be created with the following additional builder API. /* step 3: create a subscription configuration */
final NoSQLSubscriptionConfig subscriptionConfig =
// Scalable subscriber should set Subscriber Id
// with 2 as total number of subscribers and
// 0 as its own SubscriberId within the group of 2 subscribers
new NoSQLSubscriptionConfig.Builder(CKPT_TABLE_NAME)
.setSubscribedTables("usertable")
.setSubscriberId(new NoSQLSubscriberId(2,0))
.setStreamMode(streamMode)
.build();
The API setSubscriberId()
takes a single argument NoSQLSubscriberID
. NoSQLSubscriberId
is an object with both total number of subscribers and subscriber index. Hence, we need the following two arguments to construct a NoSQLubscriberId
object.
-
Number of Subscribers
The total number of subscribers that would be running on different nodes. For example, in the code example above,
.setSubscriberId(new NoSQLSubscriberId(2,0))
, theNoSQLSubscriberId
created has two subscribers in total. -
Subscriber Index
A numerical index of the current subscriber among the total number of subscribers. Note that a numerical index begins with 0. For example, two subscriber clients can be identified as 0 and 1.
Configure multiple sharded subscribers as part of a subscriber group
NoSQLSubscriptionConfig
object for a sharded
subscriber of a subscriber group, you use a constructor of the
NoSQLSubscriptionConfig.Builder
class that has a single parameter of a
mapper function. This function returns a distinct checkpoint table name computed from each
subscriber id in the group. A sample constructor of the Builder class for configuring
sharded subscribers is shown
below.public void Builder(Function<NoSQLSubscriberId, String> ckptTableMapper);
You
can build the NoSQLSubscriptionConfig
object for a sharded subscriber as
shown
below:/** * Configures a sharded subscriber
* @param subscriberId subscriber id
* @param tables subscribed tables
*/
void configureShardedSubscribers(NoSQLSubscriberId subscriberId, Set<String> tables) {
finalNoSQLSubscriptionConfig conf =
newNoSQLSubscriptionConfig.Builder( id -> buildCkptTableName(id))
.setSubscribedTables(tables)
.setSubscriberId(subscriberId)
.build();
}
buildCkptTableName
builds the checkpoint name from the subscriber id by
concatenating the subscriber id to StreamCheckpointTable. This is one way of using
the mapper function. Your application can create its own mapper function, as long as it
returns unique checkpoint table name for each
subscriber./**
* Builds stream checkpoint table from given subscriber id
* @param subscriberId subscriber id
* @return stream checkpoint table name for that subscriber
*/
private String buildCkptTableName(NoSQLSubscriberId subscriberId) {
return "StreamCheckpointTable" + subscriberId.toString();
}