11.4 Consuming New External Kafka Events

  • For use cases where any new external Kafka topic needs to be listened to, a new consumer class can be created by following the steps outlined in the Section: 2. Consuming Kafka Events of Kafka chapter of this document. Additionally, this class must implement the com.ofss.digx.infra.events.kafka.consumer.IKafkaConsumable interface.
  • For this Topic, new entries must be added in the table DIGX_FW_CONFIG_ALL_B as mentioned in the document Oracle Banking Digital Experience Installation Guide in the section OBDX Pre-defined External Kafka Topic Configurations. Apart from these, any other Producer and Consumer properties defined by Kafka can also be added for the Topic in this table.

For Example,

Insert into DIGX_FW_CONFIG_ALL_B
          (PROP_ID,CATEGORY_ID,PROP_VALUE,FACTORY_SHIPPED_FLAG,PROP_COMMENTS,SUMMARY_TEXT,CREATED_BY,CREATION_DATE,
LAST_UPDATED_BY,LAST_UPDATED_DATE,OBJECT_STATUS,OBJECT_VERSION_NUMBER,EDITABLE,CATEGORY_DESCRIPTION)
          values ('externalSystemAlertMessage@bootstrap.servers','KAFKA_CONFIG',
'ofss-mum-645.snbomprshared1.gbucdsint02bom.oraclevcn.com:9092','N',null,'Kafka props','ofssuser',sysdate,
'ofssuser',sysdate,'A',1,'Y','Kafka props');
Insert into DIGX_FW_CONFIG_ALL_B (PROP_ID,CATEGORY_ID,PROP_VALUE,FACTORY_SHIPPED_FLAG,PROP_COMMENTS,
SUMMARY_TEXT,CREATED_BY,CREATION_DATE,LAST_UPDATED_BY,LAST_UPDATED_DATE,OBJECT_STATUS,OBJECT_VERSION_NUMBER,EDITABLE,CATEGORY_DESCRIPTION)
          values ('externalSystemAlertMessage@sasl.jaas.config','KAFKA_CONFIG','org.apache.kafka.common.security.scram.ScramLoginModule
          required username="obedx" password="obedx-secret";','N',null,'Kafka props','ofssuser',sysdate,'ofssuser',sysdate,'A',1,'Y','Kafka props');
Insert into DIGX_FW_CONFIG_ALL_B (PROP_ID,CATEGORY_ID,PROP_VALUE,FACTORY_SHIPPED_FLAG,PROP_COMMENTS,SUMMARY_TEXT,CREATED_BY,CREATION_DATE,
LAST_UPDATED_BY,LAST_UPDATED_DATE,OBJECT_STATUS,OBJECT_VERSION_NUMBER,EDITABLE,CATEGORY_DESCRIPTION)
          values('externalSystemAlertMessage@sasl.mechanism','KAFKA_CONFIG','SCRAM-SHA-256','N',null,
'Kafka props','ofssuser',sysdate,'ofssuser',sysdate,'A',1,'Y','Kafka props');
Insert into DIGX_FW_CONFIG_ALL_B (PROP_ID,CATEGORY_ID,PROP_VALUE,FACTORY_SHIPPED_FLAG,PROP_COMMENTS,SUMMARY_TEXT,
CREATED_BY,CREATION_DATE,LAST_UPDATED_BY,LAST_UPDATED_DATE,OBJECT_STATUS,OBJECT_VERSION_NUMBER,EDITABLE,CATEGORY_DESCRIPTION)
          values ('externalSystemAlertMessage@security.protocol','KAFKA_CONFIG','SASL_SSL','N',null,
'Kafka props','ofssuser',sysdate,'ofssuser',sysdate,'A',1,'Y','Kafka props');
Insert into DIGX_FW_CONFIG_ALL_B (PROP_ID,CATEGORY_ID,PROP_VALUE,FACTORY_SHIPPED_FLAG,PROP_COMMENTS,
SUMMARY_TEXT,CREATED_BY,CREATION_DATE,LAST_UPDATED_BY,LAST_UPDATED_DATE,OBJECT_STATUS,OBJECT_VERSION_NUMBER,EDITABLE,CATEGORY_DESCRIPTION)
          values('externalSystemAlertMessage@ssl.truststore.location','KAFKA_CONFIG','/scratch/app/domain/obdx_domain/KafkaServerKeystore.jks','N',null,'Kafka
          props','ofssuser',sysdate,'ofssuser',sysdate,'A',1,'Y','Kafka props');Insert into DIGX_FW_CONFIG_ALL_B
          (PROP_ID,CATEGORY_ID,PROP_VALUE,FACTORY_SHIPPED_FLAG,PROP_COMMENTS,SUMMARY_TEXT,CREATED_BY,CREATION_DATE,LAST_UPDATED_BY,LAST_UPDATED_DATE,
OBJECT_STATUS,OBJECT_VERSION_NUMBER,EDITABLE,CATEGORY_DESCRIPTION)
          values ('externalSystemAlertMessage@ssl.truststore.password','KAFKA_CONFIG','orcl@123','N',null,'Kafka props','ofssuser',sysdate,'ofssuser',sysdate,'A',1,'Y','Kafka props');

Sample consumer implementation for external topic:

package com.ofss.digx.app.kafka.origination.consumer; 
import com.ofss.digx.app.origination.processors.ApplicationOnSubmitEventMessageProcessor;
import com.ofss.digx.infra.events.kafka.consumer.IKafkaConsumable;
import com.ofss.digx.infra.events.kafka.consumer.StringConsumer;
import com.ofss.digx.infra.exceptions.Exception;import org.slf4j.Logger;
import org.slf4j.LoggerFactory; 
public class OriginationEventTopicConsumer extends StringConsumer<String, String, String> implements IKafkaConsumable 
{    
 private String targetUnit;    
public OriginationEventTopicConsumer() 
throws Exception 
{        
super(String.class, String.class, String.class);    
}    
 private static final String THIS_COMPONENT_NAME = OriginationEventTopicConsumer.class.getName();   
 private static final Logger logger = LoggerFactory.getLogger(THIS_COMPONENT_NAME);    
private static final String OR_OBDX_TOPIC = "externalSystemAlertMessage";   
 private static final String OR_GROUP_ID = "obdx-obo-consumer";    
 @Override public String consumerGroup() 
{       
 return OR_GROUP_ID;    
}     @Override public String topicName() 
{       
 return OR_OBDX_TOPIC;    
}     @Override public boolean enableSeparateConsumerGroupsPerServer() 
{       
 return false;    
}     @Override public void run() 
{        logger.info("Entering into run method of {}", 
THIS_COMPONENT_NAME);       
 consume(new ApplicationOnSubmitEventMessageProcessor());       
 logger.info("Exiting from run method of {}", THIS_COMPONENT_NAME);   
 }
}