NoSQLSubscriber Example with Transaction Event Streaming

Learn to use the Oracle NoSQL Database Streams API to retrieve transaction event related information.

If streaming transaction events is enabled, you can retrieve all the committed writes and related information for a transaction event as shown in the following code sample. To successfully implement your code to display the transaction events, you must ensure your subscription is configured to stream transaction events. For details, see Configuring Transaction Event Streaming.

The TransactionEvent object returns the transaction id, transaction type, number of writes in the transaction, and the stream of committed writes for the transaction operation.

    @Override
    public void onNext(StreamOperation t) {
        switch (t.getType()) {
            case PUT:
                streamOps++;
                System.out.println("\nFound a put. Row is:");
                StreamOperation.PutEvent pe = t.asPut();
                Row row = pe.getRow();
                break;

            case DELETE:
                streamOps++;
                System.out.println("\nFound a delete. Row is:");
                System.out.println(t);
                break;

            case TRANSACTION:
                streamOps++;
                System.out.println("\nFound a transaction. Details are:");
                StreamOperation.TransactionEvent te = t.asTransaction();
                TransactionIdImpl TxnID = te.getTransactionId();
                List<StreamOperation> TxnOps = te.getOperations();
                System.out.println("\ntransactionID="  + TxnID.toString() + "\ntransactionType" + te.getTransactionType() + "\nTotalWrites" + te.getNumOperations());
                System.out.println("\nPrinting writes in the transaction:" );
                int opsNum = 1;
                for (ListIterator<StreamOperation> iterator = TxnOps.listIterator(); iterator.hasNext(); ) {
                System.out.println("\nWrite operation ["+ opsNum +"]: " + iterator.next());

                opsNum++;
                }
                break;



            default:
                throw new 
                    IllegalStateException("Receive unsupported " +
                        "stream operation from shard " +
                                t.getRepGroupId() +
                                ", seq: " + t.getSequenceId());
        }