3.9.6 Query Kafka Data as Continuous Stream
After creating the Oracle SQL access to Kafka (OSaK) views, the views can be
queried using standard SQL. The view name consists of a generated application_id (which is
the concatenation of cluster name, Kafka group name, topic name) concatenated with
view_id
.
The sensor topic described above has just one partition, and therefore one view. The view name would be “KV_MA1_QUERYAPP_SENSOR_0”. Note, view names can be identified by querying the ORA_KAFKA_PARTITION metadata table in the schema in which the ORA_KAFKA package was installed.
OSaK views can be accessed continuously, reading from an initial offset or timestamp to the end of the stream. This is the typical usage case for querying the most recent Kafka records for the specified topic.
The example below sets the starting offset to 100 records below the Kafka
partition high water mark using the ORA_KAFKA.INIT_OFFSET
procedure,
and reads from there to the end of the stream. The next time through the loop, you read
from where you left off last time to the new end of the stream. In the example, the
analytics are reduced to a simple example doing a count(*) with LOOP logic in PL/SQL.
Expected usage is: there is a loop within an application which executes a call to
ORA_KAFKA.NEXT_OFFSET
, queries the OSaK views, performs analytics
on retrieved Kafka records, and if satisfied, calls
ORA_KAFKA.UPDATE_OFFSET
, and commits the transaction. The
ORA_KAFKA.NEXT_OFFSET
procedure records the next Kafka offset from
which to read, based on the results of ORA_KAFKA.UPDATE_OFFSET
or
ORA_KAFKA.INIT_OFFSET
/ORA_KAFKA.INIT_OFFSET_TS.
ORA_KAFKA.UPDATE_OFFSET
saved the last Kafka offset read when the view was
accessed.
It is also possible to set the point from which to start reading to a
particular timestamp. ORA_KAFKA.INIT_OFFSET_TS
initializes the starting
offset related to a timestamp for each Kafka partition belonging to the OSaK view. As
with ORA_KAFKA.INIT_OFFSET
, ORA_KAFKA.INIT_OFFSET_TS
would normally be called at the outset of a new application instance dedicated to
processing the view or when recovering after an application instance shutdown or
failure.
Note:
Multiple applications reading the same set of OSaK views can result in
duplicate Kafka records being processed or Kafka records being skipped, because each
application will attempt to manage the offset. When multiple applications read the
same topic, create a set of views for each application by using application-specific
Kafka group names. Then each application can use their own offset to determine where
to read. One application can call ORA_KAFKA.INIT_OFFSET
with 100
and use one set of views, another application can call
ORA_KAFKA.INIT_OFFSET
with 550 and use another set of views,
and so on.
BEGIN
-- Before entering the loop, we initialize the starting offsets for the view relative to the current Kafka high water mark for the Kafka partition managed by our view.
-- Without an INIT_OFFSET call, records are read from either the beginning of the stream or from the offset last recorded by a COMMIT after an UPDATE_OFFSET call.
ORA_KAFKA.INIT_OFFSET
('KV_MA1_QUERYAPP_SENSOR_0', -- The view for which to initialize offsets
100, -- The number of records before the high water mark that designates the starting offset
ORA_KAFKA.WATER_MARK_HIGH); -- The above record count parameter is 100 records below the high water mark
LOOP
-- Set the offset of the next Kafka record to be processed.
-- Since we have called INIT_OFFSET,
-- the starting offset will be 100 records below the high water mark.
ORA_KAFKA.NEXT_OFFSET
('KV_MA1_QUERYAPP_SENSOR_0'); -- The view for which to set offsets
-- Now query for rows starting at 100 records below the high water mark.
SELECT count(*) from KV_MA1_QUERYAPP_SENSOR_0;
-- Now that we've done a query, record the last offset processed.
ORA_KAFKA.UPDATE_OFFSET
('KV_MA1_QUERYAPP_SENSOR_0); -- The view for which to set offsets
COMMIT;
END LOOP;
END;
/