Components of Oracle GoldenGate Data Streams
-
Async API
-
Data Streams Protocol
-
Data Streams Start/Restart Position
-
Schema Records
-
CloudEvents Format
Async API
Oracle GoldenGate Data Streams is programming language agnostic so that it can interact with a client written in any programming language. Even though the client programs typically are simple and small, users still need to manually implement the client code to interact with the data streaming service.
Adopting the AsyncAPI specification into Oracle GoldenGate Data Streams has the following advantages:
-
Ability to describe the data streams service API in industry-standard API specification and automatically generate API documentation.
-
Automatically generate client-side code with
@asyncapi/generator
.
With AsyncAPI support, Oracle GoldenGate Data Streams simplifies data streaming by generating the client code automatically. It follows the publisher and subscriber model and support a wide variety of protocols including websocket, kafka, mqtt, hms, and many IOT protocols. When describing an event-driven API, it uses the YAML modeling language and follow similar syntax for OpenAPI specification.
For example, the following snippet of AsyncAPI YAML document describes Data Streaming AsyncAPI definitions:
asyncapi: '3.0.0'
info:
title: Data Streaming API
version: '1.0.0'
description: | allows clients to subscribe to a data stream
license:
name: Apache 2.0
url: 'https://www.apache.org/licenses/LICENSE-2.0'
servers:
EAST:
protocol: ws
url: east.oraclevcn.com:9002
defaultContentType: application/json
channels:
/services/v2/stream/mystream1:
...
When a data streams resource is created, a URL link to a customized Async API specification document describing how to access this data stream endpoint, is returned in the HTTP response. This YAML document can then be used to generate the client-side code using @asyncapi/generator.
Note that to support the websocket protocol in @asyncapi/generator, you also need to implement/maintain the websocket client template for the @asyncapi/generator in GitHub.
Refer to the GitHub repository for more information about the websocket-client-template:
Data Streams Protocol
With Oracle GoldenGate Data Streams, direct access to the data in user specified format is enabled through a dedicated websocket channel that follows a simple streaming protocol.
Data Streams protocol uses push mode to send data to the client. The client first creates a streaming resource on the server through HTTP RESTful request. After the streaming resource is created, the client establishes a WebSocket connection through the streaming resource endpoint. After the WebSocket channel is established, Data Streams starts to push the data immediately and continuously without waiting for response or acknowledgement from the client.
The following sample python client illustrates the interaction between the client and the data streaming service:
import asyncio
import requests
import websockets
import json
async def client():
### create the streaming resource
payload = {"source":{"trail":"a1"}}
response = requests.post(
'http://name:pswd@localhost:9002/services/v2/stream/s1', json=payload)
### establish websocket connection and receive data continuously
uri = "ws://name:pswd@localhost:9002/services/v2/stream/s1?begin=earliest"
async with websockets.connect(uri) as websocket:
while True:
resp = await websocket.recv()
records = json.loads(resp)
for rec in records:
print(rec)
asyncio.get_event_loop().run_until_complete(client())
In the given client program, a simple Data Stream payload specifying the source data trail name is provided when creating the data stream resource endpoint s1. In a real world application, much complicated Data Stream payloads can be used during the handshake phase of the streaming protocol to configure the data streaming behavior.
For example, the following Data Stream request payload specifies the filtering rules, encoding format, and bufferSize along with the required data source trail name.
{
"$schema" : "ogg:dataStream",
"source" : {"trail":"a1"},
"rules" : [{
"action" : "exclude",
"filter" : {
"objectNames" : [ "PDB2.AR.*", "PDB1.U1.*" ]
}
}],
"encoding" : “json",
"bufferSize" : 2097152
}
Data Streams Start/Restart Position
During the websocket connection establishment, client side specifies the begin position (as a query parameter in the websocket connection URL) to start streaming the data. The begin position can be one of the following values:
-
Special keyword “now”
-
Special keyword “earliest”
-
ISO 8601 format timestamp string
-
Last processed LCR position
Each non-metadata LCR record contains an opaque position (includes CSN, XID, record # inside the transaction). Client side is responsible for maintaining the position of the last processed LCR record. The data streams service is responsible for locating the correct start/restart point based on the given begin position.
If this is the first time a client connects to the data streams service, client should provide a timestamp of where to start streaming data. The keyword now will be converted to the current timestamp and the keyword earliest will be converted to the timestamp 0.
Alternatively, an ISO 8601 timestamp string can be used for begin position. In all cases, the data streams service performs a timestamp-based lookup on the source trail to determine the start position.
If this is the recovery/restart case, client should provide the saved last processed position to the data streams service during handshake. The data streams service will perform a position-based lookup on the source trail to determine the start position. The behavior of data streaming recovery also depends on the QoS level specified in the data stream.
Schema Records
-
DDL operation record
-
DML operation record
-
Object Metadata record
-
Data Streams Metadata record
Oracle GoldenGate Data Streams sends the corresponding schema record before sending any type of data or metadata records.
CloudEvents
CloudEvents is a specification for describing event data in common formats to provide interoperability across services, platforms and systems. As Oracle GoldenGate Data Streams currently only supports JSON data encoding, the support for CloudEvents format is limited to the JSON event format. The complete specification for JSON Event Format for CloudEvents can be found at:
https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/json-format.md
https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.json
data
field contains the original
data records, which is Oracle GoldenGate DML/DDL/metadata/schema
records.{
"specversion" : "1.0",
"type" : "com.example.someevent",
"source" : "/mycontext",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data: {…}
}