5 Interoperability of Transactional Event Queue with Apache Kafka
Oracle Transactional Event Queue (TxEventQ) makes it easy to implement event-based applications. It is also highly integrated with Apache Kafka, an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, written in Scala and Java. Apart from enabling applications that use Kafka APIs to transparently operate on Oracle TxEventQ, Oracle TxEventQ also supports bi-directional information flow between TxEventQ and Kafka, so that changes are available in TxEventQ or Kafka as soon as possible in near-real-time.
Apache Kafka Connect is a framework included in Apache Kafka that integrates Kafka with other systems. Oracle TxEventQ will provide standard JMS package and related JDBC, Transaction packages to establish the connection and complete the transactional data flow. Oracle TxEventQ configures standard Kafka JMS connectors to establish interoperability and complete the data flow between the two messaging systems.
This chapter contains the following topics.
Setup and Prerequisites
The Kafka Connect uses Java Naming and Directory Interface (JNDI) and JMS standard interface to create a JMS ConnectionFactory
instance for the Oracle TxEventQ and then enqueues or dequeues messages to/from TxEventQ correspondingly.
The prerequisites are as follows:
-
Kafka Broker: Confluent Platform 3.3.0 or above, or Kafka 0.11.0 or above
-
Connect: Confluent Platform 4.1.0 or above, or Kafka 1.1.0 or above
-
Java 1.8
-
Oracle TxEventQ JMS 1.1+ Client Jars
Connecting from Apache Kafka to Oracle TxEventQ (Confluent Platform and CLI Example)
Steps for message transfer from Apache Kafka to TxEventQ are as follows.
-
Start Oracle Database
-
Setup TxEventQ
-
Create TxEventQ user and Grant User Corresponding Privileges.
CREATE USER <username> IDENTIFIED BY <password>; GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO <username>; GRANT EXECUTE ON DBMS_AQ TO <username>; GRANT EXECUTE ON DBMS_AQADM TO <username>; -- alter table space privileges if needed
-
Create TxEventQ and start
BEGIN DBMS_AQADM.CREATE_SHARDED_QUEUE( queue_name => '<username>.<queuename>', multiple_consumers => FALSE, -- False: Queue True: Topic queue_payload_type => DBMS_AQADM.JMS_TYPE); DBMS_AQADM.START_QUEUE(queue_name => '<username>.<queuename>'); END; /
Note:
multiple_consumers
:False
means Queue,True
means Topic in JMS.
-
-
Install Kafka Connect Sink Component
# run from your Confluent Platform installation directory confluent-hub install confluentinc/kafka-connect-jms-sink:latest
-
Import TxEventQ Jars into Kafka JMS Sink Connector
Copy the following jars into the JMS Sink Connector's plugin folder
(share/confluent-hub-components/confluentinc-kafka-connect-jms-sink/lib
). This needs to be done on everyConnect
worker node and the workers must be restarted to pick up the TxEventQ jars.-
aqapi.jar
: TxEventQ JMS library jar -
ojdbc8.jar
: Oracle JDBC Connection library jar -
jta-1.1.jar
: JTA: standard Java interfaces between a transaction manager and the parties involved in a distributed transaction system
-
-
Start Confluent Platform
confluent local start
-
Configure JMS Sink Connector:
d jms-source.json
{ "name": "JmsSinkConnector", "config": { "connector.class": "io.confluent.connect.jms.JmsSinkConnector", "tasks.max": "1", "topics": "jms-messages", "java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory", "java.naming.provider.url": <connection string>, "db_url": <connection string>, "java.naming.security.principal": <username>, "java.naming.security.credentials": <password>, "jndi.connection.factory": "javax.jms.XAQueueConnectionFactory", "jms.destination.type": "queue", "jms.destination.name": <queuename>, "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.storage.StringConverter", "confluent.topic.bootstrap.servers":"localhost:9092", "confluent.topic.replication.factor": "1" } }
-
Load the JMS Sink Connector
confluent local load jms -- -d jms-sink.json
-
Post-Check Connector Status
-
Using Confluent Platform Admin: Direct to
http://localhost:9021
, confluent platform admin, see connector status. -
Using Confluent CLI
confluent local status jms
-
-
Test Message Transfer
Produce random messages into Kafka topic.
seq 10 | confluent local produce jms-messages
Check TxEventQ enqueued messages.
SELECT * FROM GV$PERSISTENT_QUEUES; SELECT * FROM GV$AQ_SHARDED_SUBSCRIBER_STAT;
Connecting from Oracle TxEventQ to Apache Kafka (Confluent Platform and CLI Example)
Steps for message transfer from TxEventQ to Apache Kafka are as follows.
-
Start Oracle Database
-
Setup TxEventQ
-
Create TxEventQ user and Grant User Corresponding Privileges.
CREATE USER <username> IDENTIFIED BY <password>; GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO <username>; GRANT EXECUTE ON DBMS_AQ TO <username>; GRANT EXECUTE ON DBMS_AQADM TO <username>; -- alter table space privileges if needed
-
Create TxEventQ and start
BEGIN DBMS_AQADM.CREATE_SHARDED_QUEUE( queue_name => '<username>.<queuename>', multiple_consumers => FALSE, -- False: Queue True: Topic queue_payload_type => DBMS_AQADM.JMS_TYPE); DBMS_AQADM.START_QUEUE(queue_name => '<username>.<queuename>'); END; /
Note:
multiple_consumers
:False
means Queue,True
means Topic in JMS.
-
-
Install Kafka Connect Source Component
confluent-hub install confluentinc/kafka-connect-jms:latest
-
Import TxEventQ Jars into Kafka JMS Source Connector
Copy the following jars into the JMS Source Connector's plugin folder (
share/confluent-hub-components/confluentinc-kafka-connect-jms/lib
).-
aqapi.jar
: TxEventQ JMS library jar -
ojdbc8.jar
: Oracle JDBC Connection library jar -
jta-1.1.jar
: JTA: standard Java interfaces between a transaction manager and the parties involved in a distributed transaction system
-
-
Start Confluent Platform
confluent local start
-
Configure JMS Source Connector
jms-source.json
{ "name": " JmsSourceConnector", "config": { "connector.class": "io.confluent.connect.jms.JmsSourceConnector", "kafka.topic": "jms-messages", "jms.destination.name": <queuename>, "jms.destination.type": "queue", "java.naming.factory.initial": "oracle.jms.AQjmsInitialContextFactory", "java.naming.provider.url": <connection string>, "db_url": <connection string>, "java.naming.security.principal": <username>, "java.naming.security.credentials": <password>, "confluent.license": "", "confluent.topic.bootstrap.servers": "localhost:9092" } }
-
Load the JMS Source Connector
confluent local load jms -- -d jms-source.json
-
Post-Check Connector Status
-
Using Confluent Platform Admin: Direct to
http://localhost:9021
, confluent platform admin, see connector status. -
Using Confluent CLI
confluent local status jms
-
-
Test Message Transfer
Use sink connector above to enqueue messages in the TxEventQ, then pause the sink connector and start the source connector. The messages would be dequeued from the TxEventQ and produce into Kafka topic.
Monitoring Message Transfer
The Sink/Source connector messages transfer can be monitored from both two sides:
-
Apache Kafka: direct to
http://localhost:9021
Confluent Platform Admin, check produce/consume console for statistics. -
Oracle TxEventQ: See Monitoring Oracle Transactional Event Queues and Advanced Queuing to startup TxEventQ Monitor System to check enqueue/dequeue rate, TxEventQ depth, and more DB/System Level statistics.