NoSQLSubscriber Example with Before-images Streaming

Learn to use the Oracle NoSQL Database Streams API to retrieve before-images and related field information.

If before-images are enabled, you can retrieve before-images and related information and print it to stdout as shown in the following code sample. To successfully implement your operation to display before-images, you must check if before-images are enabled on the table of interest, your subscription is configured to stream before-images, and if before-images are within their expiry dates.

The row objects include the state of the row after and before the operation.

    @Override
    public void onNext(StreamOperation t) {

        switch (t.getType()) {
            case PUT:
                streamOps++;
                System.out.println("\nFound a put. Row is:");
                StreamOperation.PutEvent pe = t.asPut();
                Row rowAfter = pe.getRow();
                // Print after-image
                System.out.println("\nAfter-images streaming \n" + rowAfter.toJsonString(true));                     
 
                System.out.println("\nBefore-images enabled = " + t.isBeforeImageEnabled());                     
                System.out.println("\nBefore-images subscribed = " + t.includeBeforeImage());  
               // Check if before-images is enabled, subscribed, and within the expiry date 
                if (t.isBeforeImageEnabled() && t.includeBeforeImage() && (!t.isBeforeImageExpired())) {              
                    Row rowBefore = t.getBeforeImage();
                    System.out.println("\nBefore-images streaming \n" + rowBefore.toJsonString(true));  
                }                  
                else
                    System.out.println("Before-images is not streamed");
                break;

            case DELETE:
                streamOps++;
                System.out.println("\nFound a delete. Row is:");
                System.out.println(t);
                break;

            default:
                throw new 
                    IllegalStateException("Receive unsupported " +
                        "stream operation from shard " +
                                t.getRepGroupId() +
                                ", seq: " + t.getSequenceId());
        }
        if (streamOps == numOps) {
            getSubscription().cancel();
            System.out.println("Subscription cancelled after " +
                    "receiving " + numOps + " operations.");
        }
    }