3.9.4 Create Views to Access CSV Data in a Kafka Topic
In order to query data from a Kafka topic, you first need to create the Oracle SQL Access to Kafka (OSaK) views that map to the Kafka topic.
Note:
If the Kafka record format isJSON_VARCHAR2
, a reference table should be passed as NULL. See
Create Views to Access JSON Data in a Kafka Topic.
The reference table describes the shape of the CSV data and is used to create OSAK external tables and views over the data. The order of the columns in the reference table must match the order of the fields in the Kafka record key and value. The Kafka record key must always be NULL or always be non-NULL. If the key is not NULL, the fields within the key must be the first columns in the reference table. The fields within the Kafka record value are the remaining columns in the reference table.
The following example creates reference table SENSOR_RECORD_SHAPE
whose
schema maps to records in the Kafka topic ‘sensor’:
CREATE TABLE sensor_record_shape(
msg_number INTEGER PRIMARY KEY,
msg_timestamp TIMESTAMP,
sensor_type_id INTEGER,
sensor_unit_id INTEGER,
temperature_setting NUMBER(6,3),
temperature_reading NUMBER(6,3)
);
Note:
The cluster name, group name, and topic name must each be 30 characters or less.DECLARE
views_created INTEGER;
application_id VARCHAR2(128);
BEGIN
ORA_KAFKA.CREATE_VIEWS
('MA1', -- The name of the cluster (specified in ORA_KAFKA.REGISTER_CLUSTER)
'QUERYAPP', -- The name given by the user for a set of views, corresponds to the concept of a Kafka group
'sensor', -- The name of the Kafka topic
'CSV', -- The format of the topic record
'SENSOR_RECORD_SHAPE', -- The name of the database reference table
views_created, -- Output: number of views created
application_id); -- Output: the application id of the set of views
-- created that uniquely identifies the view objects
dbms_output.put_line(‘views created = ‘ || views_created);
dbms_output.put_line(‘application id = ‘ || application_id);
END;
/
SQL> describe KV_MA1_QUERYAPP_SENSOR_0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA$PARTITION NUMBER(38)
KAFKA$OFFSET NUMBER(38)
KAFKA$EPOCH_TIMESTAMP NUMBER(38)
MSG_NUMBER NOT NULL NUMBER
MSG_TIMESTAMP TIMESTAMP(6)
SENSOR_TYPE_ID NUMBER
SENSOR_UNIT_ID NUMBER
TEMPERATURE_SETTING NUMBER(6,3)
TEMPERATURE_READING NUMBER(6,3)
Where KAFKA$PARTITION
is the Kafka partition id,
KAFKA$OFFSET
is the offset of the Kafka record,
KAFKA$EPOCH_TIMESTAMP
is the timestamp of the Kafka record. The
remaining columns represent the fields in the CSV data.