3.9.9 Load Kafka Data into Tables Stored in Oracle Database

The Oracle SQL access to Kafka (OSaK) ORA_KAFKA.LOAD_TABLE procedure loads data from a Kafka topic into a database table. ORA_KAFKA.LOAD_TABLE creates a view which is used internally and maps to all partitions of the Kafka topic.

The view is not deleted at the end of the ORA_KAFKA.LOAD_TABLE execution. This means that subsequent calls to ORA_KAFKA.LOAD_TABLE with the same cluster, group, and topic arguments as passed previously, will start loading where the previous ORA_KAFKA.LOAD_TABLE left off, using the same view.

To continuously load Kafka data into the database, the ORA_KAFKA.LOAD_TABLE procedure can be called in a loop.

Example 1:

The following example illustrates a single call to the ORA_KAFKA.LOAD_TABLE procedure which loads data from the sensor topic into the Oracle database table sensortab.

Note:

The cluster name, group name, and topic name must each be 30 characters or less.
DECLARE
  num_records_loaded INTEGER;
BEGIN
  ORA_KAFKA.LOAD_TABLE
    ('MA1',                   -- The name of the cluster
    'LOADAPP',                -- The name of the Kafka group
    'sensor',                 -- The name of the topic
    'CSV',                    -- The format of the Kafka record
    'sensortab',              -- The name of the target table in Oracle.
                              -- This table must reflect the shape of the rows
                              -- retrieved from Kafka
    num_records_loaded);      -- The number of Kafka records loaded
  dbms_output.put_line(‘Kafka records loaded = ‘ || num_records_loaded);
  COMMIT;
END;
/
Example 2:
The following example illustrates a single call to the ORA_KAFKA.LOAD_TABLE procedure to load JSON data from the sensorj topic into the Oracle database table sensortab_json where the sensortab_json table is defined as follows:
CREATE TABLE sensortab_json (KEY VARCHAR2(32767), VALUE VARCHAR2(32767));

After the load, the VALUE column can be queried like any column in an Oracle table that stores JSON data. Note that the JSON data record cannot exceed 32767.

DECLARE
        num_records_loaded INTEGER;
BEGIN
        ORA_KAFKA.LOAD_TABLE
                ('MA1', -- The name of the cluster
                'LOADAPP', -- The name of the Kafka group
                'sensorj', -- The name of the topic
                ‘JSON_VARCHAR2’, -- The format of the Kafka record
                'sensortab_json', -- The name of the target table in Oracle. This
                                     -- table must reflect the shape of the rows
                                     -- retrieved from Kafka
                num_records_loaded); -- The number of Kafka records loaded

        dbms_output.put_line(‘Kafka records loaded = ' || num_records_loaded;

COMMIT;
END;
/