2 Using the StreamOperation Class

A streams application works by implementing Subscribers. Subscribers receive a stream of events that consist of write operations to a table of interest.

For more information, see Implementing Subscribers.

Every event your application receives in a subscription stream is represented as an oracle.kv.pubsub.StreamOperation. Each of these events represents a put, delete, or transaction operation on the table that your application subscribes to.

Note:

A put represents either a put API call or a SQL DML statement that inserts or updates one of more records.

For more information, see oracle.kv.pubsub.StreamOperation class summary in the — Java Direct Driver API Reference

The StreamOperation interface provides the following methods:

  • StreamOperation.getType()

    Returns a StreamOperation.Type object. This is an enum constant that is either delete or put. For example:

    // so is a StreamOperation object. It is obtained using
    // NoSQLSubscriber.onNext().
    switch (so.getType()) {
        case PUT:
            {
                // Process the put operation here.
            }
            break;
        case DELETE:
            {
                // Process the delete operation here.
            }
            break;
        case TRANSACTION:
            {
                // Process the transaction operation here.
            }
            break;
        default:
            // Received an unknown and therefore illegal operation type.
            throw new IllegalStateException("... exception message ...");
    } 
  • StreamOperation.asDelete()

    Returns the operation as a StreamOperation.DeleteEvent object. The object contains only the Primary Key associated with the delete operation:

    // so is a StreamOperation object. It is obtained using
    // NoSQLSubscriber.onNext().
    StreamOperation.DeleteEvent de = so.asDelete();
    PrimaryKey pk = de.getPrimaryKey();
  • StreamOperation.asPut()

    Returns the operation as a StreamOperation.PutEvent object. This object allows you to obtain the row that was changed by the put operation.

    Note:

    • If before-images are not enabled, the events returned represent the state of the rows after the put operations are performed.
    • If before-images are enabled and the stream is configured to include before-images, the events returned represent the state of the rows after and before the put operations are performed.
    // so is a StreamOperation object. It is obtained using
    // NoSQLSubscriber.onNext().
    StreamOperation.PutEvent pe = so.asPut();
    Row row = pe.getRow();
  • StreamOperation.asTransaction()

    Returns the operation as a StreamOperation.TransactionEvent object.

    The operations in the transaction event stream will be in the same order they were performed on a shard. The StreamOperation.TransactionEvent object allows you to obtain the following internals for the transaction event:

    • getTransactionId(): Returns the transaction ID that helps uniquely identify a transaction in the data store. The transaction ID is a store-wide unique value and consists of the shard ID, internal transaction ID, and timestamp when the transaction is committed. This is useful in various scenarios like monitoring, debugging, and so on.

      Note:

      The timestamp is in the time zone of the data store where the transaction event is generated and in the UNIX epoch format.
    • getTransactionType(): Returns the transaction event as COMMIT type.
    • getNumOperations(): Returns the number of write operations in the transaction.
    • getOperations(): Returns an ordered list of put and delete operations in the transaction event.
    // 'so' is a StreamOperation object. It is obtained using
    // NoSQLSubscriber.onNext().
    StreamOperation.TransactionEvent te = so.asTransaction();
    trace("Transaction event=" + te.toJsonString());
  • StreamOperation.getRepGroupId()

    Returns the Shard ID (as an int) where this write operation was performed.

  • StreamOperation.getSequenceId()

    Returns the unique sequence ID associated with this operation. This ID uniquely identifies a stream operation associated with a given Publisher.

    These IDs can be used to sequence operations seen for a given key. The Subscription API guarantees that the order of events for a particular key is the same as the order in which these operations were applied in Oracle NoSQL Database. The subscription API provides no guarantees about the order of operations beyond the single key.

You can use the following methods from the StreamOperation interface to verify and process events with before-images for a table.

  • StreamOperation.isBeforeImageEnabled(): Returns true if you have enabled before-images for the subscribed table when the current Streamoperation event is made to the server. Otherwise, returns false.

  • StreamOperation.includeBeforeImage(): Returns true if you have configured the subscription to include before-images. Otherwise, returns false.

  • StreamOperation.isBeforeImageExpired(): Returns true if the before-image has expired from the persistent storage. Otherwise, returns false.

    Note:

    This method also relies on the table being enabled to generate before-images prior to the delivery of this StreamOperation event.

    For example, consider a scenario when you enable before-images with a 24 hours TTL. Streaming starts 36 hours after before-images were generated. The expiry check returns true, and before-images will not appear in the stream.

  • StreamOperation.getBeforeImage(): Returns the before-image associated with the stream event represented by the current StreamOperation event for the row. This method returns null in the following cases:
    • You have not configured the stream to include before-images.
    • You have not enabled before-images for the table.
    • You have enabled before-images, however, it is not generated for write operations such as INSERT.
    • You have enabled before-images and configured stream to include it, however, the before-image has expired.