11.4 Consuming New External Kafka Events

This topic provides information on Consuming New External Kafka Events.

  • For use cases where any new external Kafka topic needs to be listened to, a new consumer class can be created by following the steps outlined in the Section: 2. Consuming Kafka Events of Kafka chapter of this document. Additionally, this class must implement the com.ofss.digx.infra.events.kafka.consumer.IKafkaConsumable interface.
  • For this Topic, new entries need to be included in the table DIGX_CFG_CONFIG_ALL_B as stated in the Oracle Banking Digital Experience Installation Guide, specifically in the section about OBDX Pre-defined External Kafka Topic Configurations. Additionally, any other Producer and Consumer properties specified by Kafka can also be added for the Topic in this table.

For Example,

INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@bootstrap.servers', 'DEV', 'ofss-mum-645.snbomprshared1.gbucdsint02bom.oraclevcn.com:9092', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@sasl.jaas.config', 'DEV', 'org.apache.kafka.common.security.scram.ScramLoginModule required username="obedx" password="obedx-secret";', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@sasl.mechanism', 'DEV', 'SCRAM-SHA-256', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@security.protocol', 'DEV', 'SASL_SSL', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@ssl.truststore.location', 'DEV', '/scratch/app/domain/obdx_domain/KafkaServerKeystore.jks', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');
INSERT INTO DIGX_CFG_CONFIG_ALL_B (PROP_ID, PROFILE, PROP_VALUE, ENTITY_SPECIFIC, EDITABLE, MANDATORY_OVERRIDE, PROPERTY_GROUP, CREATED_BY, CREATION_DATE, LAST_UPDATED_BY, LAST_UPDATED_DATE, OBJECT_VERSION_NUMBER, MODULE, SEQUENCE, VALIDATION, IS_ENUMERATED) VALUES ('KAFKA_CONFIG.externalSystemAlertMessage@ssl.truststore.password', 'DEV', 'orcl@123', 'N', 'Y', 'N', 'Kafka', 'SYSTEM', sysdate, 'SYS', sysdate, 1, 'infra', '-1', '.*', 'N');

Sample consumer implementation for external topic:

package com.ofss.digx.app.kafka.origination.consumer; 
import com.ofss.digx.app.origination.processors.ApplicationOnSubmitEventMessageProcessor;
import com.ofss.digx.infra.events.kafka.consumer.IKafkaConsumable;
import com.ofss.digx.infra.events.kafka.consumer.StringConsumer;
import com.ofss.digx.infra.exceptions.Exception;import org.slf4j.Logger;
import org.slf4j.LoggerFactory; 
public class OriginationEventTopicConsumer extends StringConsumer<String, String, String> implements IKafkaConsumable 
{    
 private String targetUnit;    
public OriginationEventTopicConsumer() 
throws Exception 
{        
super(String.class, String.class, String.class);    
}    
 private static final String THIS_COMPONENT_NAME = OriginationEventTopicConsumer.class.getName();   
 private static final Logger logger = LoggerFactory.getLogger(THIS_COMPONENT_NAME);    
private static final String OR_OBDX_TOPIC = "externalSystemAlertMessage";   
 private static final String OR_GROUP_ID = "obdx-obo-consumer";    
 @Override public String consumerGroup() 
{       
 return OR_GROUP_ID;    
}     @Override public String topicName() 
{       
 return OR_OBDX_TOPIC;    
}     @Override public boolean enableSeparateConsumerGroupsPerServer() 
{       
 return false;    
}     @Override public void run() 
{        logger.info("Entering into run method of {}", 
THIS_COMPONENT_NAME);       
 consume(new ApplicationOnSubmitEventMessageProcessor());       
 logger.info("Exiting from run method of {}", THIS_COMPONENT_NAME);   
 }
}