3.9.9 Load Kafka Data into Tables Stored in Oracle Database
The Oracle SQL access to Kafka (OSaK) ORA_KAFKA.LOAD_TABLE
procedure loads data from a Kafka topic into a database table.
ORA_KAFKA.LOAD_TABLE
creates a view which is used internally and maps to all
partitions of the Kafka topic.
The view is not deleted at the end of the ORA_KAFKA.LOAD_TABLE
execution. This means that subsequent calls to ORA_KAFKA.LOAD_TABLE
with the
same cluster, group, and topic arguments as passed previously, will start loading where the
previous ORA_KAFKA.LOAD_TABLE
left off, using the same view.
To continuously load Kafka data into the database, the
ORA_KAFKA.LOAD_TABLE
procedure can be called in a loop.
The following example illustrates a single call
to the ORA_KAFKA.LOAD_TABLE
procedure which loads data from the sensor
topic into the Oracle database table sensortab.
Note:
The cluster name, group name, and topic name must each be 30 characters or less.DECLARE
num_records_loaded INTEGER;
BEGIN
ORA_KAFKA.LOAD_TABLE
('MA1', -- The name of the cluster
'LOADAPP', -- The name of the Kafka group
'sensor', -- The name of the topic
'CSV', -- The format of the Kafka record
'sensortab', -- The name of the target table in Oracle.
-- This table must reflect the shape of the rows
-- retrieved from Kafka
num_records_loaded); -- The number of Kafka records loaded
dbms_output.put_line(‘Kafka records loaded = ‘ || num_records_loaded);
COMMIT;
END;
/
ORA_KAFKA.LOAD_TABLE
procedure to load JSON data from the sensorj
topic into the Oracle database table sensortab_json
where the
sensortab_json
table is defined as
follows:CREATE TABLE sensortab_json (KEY VARCHAR2(32767), VALUE VARCHAR2(32767));
After the load, the VALUE column can be queried like any column in an Oracle table that stores JSON data. Note that the JSON data record cannot exceed 32767.
DECLARE
num_records_loaded INTEGER;
BEGIN
ORA_KAFKA.LOAD_TABLE
('MA1', -- The name of the cluster
'LOADAPP', -- The name of the Kafka group
'sensorj', -- The name of the topic
‘JSON_VARCHAR2’, -- The format of the Kafka record
'sensortab_json', -- The name of the target table in Oracle. This
-- table must reflect the shape of the rows
-- retrieved from Kafka
num_records_loaded); -- The number of Kafka records loaded
dbms_output.put_line(‘Kafka records loaded = ' || num_records_loaded;
COMMIT;
END;
/