3.9.4 Create Views to Access CSV Data in a Kafka Topic

In order to query data from a Kafka topic, you first need to create the Oracle SQL Access to Kafka (OSaK) views that map to the Kafka topic.

When the Kafka record format is CSV, before creating the views, you must create a reference table whose schema is mapped to records in the Kafka topic.

Note:

If the Kafka record format is JSON_VARCHAR2, a reference table should be passed as NULL. See Create Views to Access JSON Data in a Kafka Topic.

The reference table describes the shape of the CSV data and is used to create OSAK external tables and views over the data. The order of the columns in the reference table must match the order of the fields in the Kafka record key and value. The Kafka record key must always be NULL or always be non-NULL. If the key is not NULL, the fields within the key must be the first columns in the reference table. The fields within the Kafka record value are the remaining columns in the reference table.

The following example creates reference table SENSOR_RECORD_SHAPE whose schema maps to records in the Kafka topic ‘sensor’:

CREATE TABLE sensor_record_shape(
 msg_number            INTEGER PRIMARY KEY,
 msg_timestamp         TIMESTAMP,
 sensor_type_id        INTEGER,
 sensor_unit_id        INTEGER,
 temperature_setting   NUMBER(6,3),
 temperature_reading   NUMBER(6,3)
 );
Now that the reference table has been created, we can create the OSaK views. The following example creates one view per partition in the topic ‘sensor’ where the record format is CSV. Since the 'sensor' topic has one partition, one view is created.

Note:

The cluster name, group name, and topic name must each be 30 characters or less.
DECLARE
    views_created INTEGER;
    application_id VARCHAR2(128);
  BEGIN
    ORA_KAFKA.CREATE_VIEWS
      ('MA1',                        -- The name of the cluster (specified in ORA_KAFKA.REGISTER_CLUSTER)
       'QUERYAPP',                   -- The name given by the user for a set of views, corresponds to the concept of a Kafka group
       'sensor',                     -- The name of the Kafka topic
       'CSV',                        -- The format of the topic record
       'SENSOR_RECORD_SHAPE',        -- The name of the database reference table
       views_created,                -- Output: number of views created
       application_id);              -- Output: the application id of the set of views                                                   
                                     -- created that uniquely identifies the view objects                                                  
    dbms_output.put_line(‘views created = ‘ || views_created);
    dbms_output.put_line(‘application id = ‘ || application_id);
  END;
/
The above example causes the following view to be created:
SQL> describe KV_MA1_QUERYAPP_SENSOR_0;
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 KAFKA$PARTITION                                    NUMBER(38)
 KAFKA$OFFSET                                       NUMBER(38)
 KAFKA$EPOCH_TIMESTAMP                              NUMBER(38)
 MSG_NUMBER                                NOT NULL NUMBER
 MSG_TIMESTAMP                                      TIMESTAMP(6)
 SENSOR_TYPE_ID                                     NUMBER
 SENSOR_UNIT_ID                                     NUMBER
 TEMPERATURE_SETTING                                NUMBER(6,3)
 TEMPERATURE_READING                                NUMBER(6,3)

Where KAFKA$PARTITION is the Kafka partition id, KAFKA$OFFSET is the offset of the Kafka record, KAFKA$EPOCH_TIMESTAMP is the timestamp of the Kafka record. The remaining columns represent the fields in the CSV data.