11.4 Consuming New External Kafka Events
This topic provides information on Consuming New External Kafka Events.
- For use cases where any new external Kafka topic needs to be listened to, a new consumer class can be created by following the steps outlined in the Section: 2. Consuming Kafka Events of Kafka chapter of this document. Additionally, this class must implement the com.ofss.digx.infra.events.kafka.consumer.IKafkaConsumable interface.
- For this Topic, new entries need to be included in the table
DIGX_CFG_CONFIG_ALL_Bas stated in the Oracle Banking Digital Experience Installation Guide, specifically in the section about OBDX Pre-defined External Kafka Topic Configurations. Additionally, any other Producer and Consumer properties specified by Kafka can also be added for the Topic in this table.
For Example,
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@bootstrap.servers', 'DEV', 'ofss-mum-645.snbomprshared1.gbucdsint02bom.oraclevcn.com:9092', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@sasl.jaas.config', 'DEV', 'org.apache.kafka.common.security.scram.ScramLoginModule required username="obedx" password="obedx-secret";', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@sasl.mechanism', 'DEV', 'SCRAM-SHA-256', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@security.protocol', 'DEV', 'SASL_SSL', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@ssl.truststore.location', 'DEV', '/scratch/app/domain/obdx_domain/KafkaServerKeystore.jks', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@ssl.truststore.password', 'DEV', 'orcl@123', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');Sample consumer implementation for external topic:
package com.ofss.digx.app.kafka.origination.consumer;
import com.ofss.digx.app.origination.processors.ApplicationOnSubmitEventMessageProcessor;
import com.ofss.digx.infra.events.kafka.consumer.IKafkaConsumable;
import com.ofss.digx.infra.events.kafka.consumer.StringConsumer;
import com.ofss.digx.infra.exceptions.Exception;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OriginationEventTopicConsumer extends StringConsumer<String, String, String> implements IKafkaConsumable
{
private String targetUnit;
public OriginationEventTopicConsumer()
throws Exception
{
super(String.class, String.class, String.class);
}
private static final String THIS_COMPONENT_NAME = OriginationEventTopicConsumer.class.getName();
private static final Logger logger = LoggerFactory.getLogger(THIS_COMPONENT_NAME);
private static final String OR_OBDX_TOPIC = "externalSystemAlertMessage";
private static final String OR_GROUP_ID = "obdx-obo-consumer";
@Override public String consumerGroup()
{
return OR_GROUP_ID;
} @Override public String topicName()
{
return OR_OBDX_TOPIC;
} @Override public boolean enableSeparateConsumerGroupsPerServer()
{
return false;
} @Override public void run()
{ logger.info("Entering into run method of {}",
THIS_COMPONENT_NAME);
consume(new ApplicationOnSubmitEventMessageProcessor());
logger.info("Exiting from run method of {}", THIS_COMPONENT_NAME);
}
}Parent topic: Messaging System Integration for OBDX