11.2.1 New Topic Creation With Producer and Consumer

1. Producing Kafka Events

In the application that produces events or messages, com.ofss.digx.infra.events.MessageProducerUtility class should be used for producing data to Kafka topics.

MessageProducerUtility :

Table 11-1 MessageProducerUtility

Modifier and type Method and description
boolean sendMessage(String message, String topic)

produces message with provided data to the given topic.

message- specifies message to be sent. It is represented as a string.

topic- name of the topic to which the message will be sent.

boolean sendMessage(Object key, String message, Class<K> keyClass, String topic)

produces message with specified key and data to the given topic.

key - The key object associated with the message. This key is used for partitioning or routing the message within the topic (can be String, Integer or null).

message- specifies message to be sent. It is represented as a string.

keyClass- The class type of the key object. This helps in serializing or processing the key appropriately.

topic- name of the topic to which the message will be sent.

boolean sendObjectMessage(Object message ,Class<T> eventClass, String topic)

produces message with provided data to the given topic.

message- specifies value to be used in the message. Can be Avro object or normal POJO.

eventClass - The class type of the message that is being sent to the topic. This helps in serializing or processing the message appropriately. Must be an Avro class if using Avro. If format is JSON, then it will be class instance of the POJO.

topic- name of the topic to which the message will be sent.

boolean sendObjectMessage(Object key, Object message, Class<K> keyClass ,Class<T> eventClass, String topic)

produces message with specified key and data to the given topic.

key - The key object associated with the message. This key is used for partitioning or routing the message within the topic (can be String, Integer or null).

message- specifies value to be used in the message. Can be Avro object or normal POJO.

keyClass- The class type of the key object. This helps in serializing or processing the key appropriately.

eventClass - The class type of the message that is being sent to the topic. This helps in serializing or processing the message appropriately. Must be an Avro class if using Avro. If format is JSON, then it will be class instance of the POJO.

topic- name of the topic to which the message will be sent.

Sample Producer code:

JMSutility and TopicUtility code to be replaced by below snippet.

Example when using Avro data format

MessageProducerUtility.getInstance().sendObjectMessage(policyMapDTO, PolicyMap.class, POLICIES_TOPIC); 

Example when using Byte array data format

MessageProducerUtility.getInstance().sendObjectMessage(policyMapDTO, byte[].class, POLICIES_TOPIC);

Example when using String data format

MessageProducerUtility.getInstance().sendMessage(policyMapDTO, POLICIES_TOPIC);

Add below dependencies in build.gradle of the gradle project from where you are producing or publishing the Kafka message.

implementation "com.ofss.digx.infra:com.ofss.digx.infra.events:$libs_digxVersion"

2. Consuming Kafka Events

For implementing consumers, below steps need to be performed.

  1. Creating Consumer project
    • Create a new jar for kafka consumer for your module

      Add this in the module's settings.gradle.

      Add the below dependencies in build.gradle.

      implementation "org.slf4j:slf4j-api:$libs_slf4jVersion" implementation
              "org.apache.avro:avro:$libs_avroVersion" implementation
              "com.ofss.digx.infra:com.ofss.digx.infra.events:$libs_digxVersion"
  2. Extending Consumer Classes

    Implement own consumers by extending one of the provided consumer classes:

    • com.ofss.digx.infra.events.kafka.consumer.StringConsumer
    • com.ofss.digx.infra.events.kafka.consumer.AvroConsumer
    • com.ofss.digx.infra.events.kafka.consumer.ByteArrayConsumer.

      The choice of class depends on the data type present in the Kafka message.

      AvroConsumer: Extend this class if the data to be consumed is of Avro type.

      StringConsumer: Extend this class if the data to be consumed is of String type.

      ByteArrayConsumer: Extend this class if the data to be consumed is of byte array type.

      All consumer classes - StringConsumer, ByteArrayConsumer and AvroConsumer are generic classes represented as AbstractConsumer<K, T, V>, where:

      • K: The type of the key. It can be String, Integer, or null.
      • T: The type of the message sent by the topic.
      • V: The type of object to which the message is converted for processing.
  3. Override Methods

    topicName(): Specify the name of the topic the consumer should listen to. Returns String.

    consumerGroup(): Specify the consumer group name. Returns String. The consumer groupname in each consumer should be different in case there are multiple consumers for the same producer.

    enableSeparateConsumerGroupsPerServer(): When true, each instance of the consumer on each server creates its own consumer group. When false, all instances of this consumer across all servers share the same consumer group. Default is false if not overriden.

    ifFilteredConsumer(): Return true if :

    • The consumer is part of a shared library used in multiple WARs.
    • The Kafka event should only be processed if a particular filter criteria sent by the producer is supported by the consumer's application.

    In simple terms, this ensures that a consumer processes only relevant events based on the filter.

    If ifFilteredConsumer() is set to return true, you need to pass filter in headers at producer side while sending Kafka event.

    Sample producer code:

    Map<String, String> headers = new HashMap<>();headers.put("API_TYPE", detailDTO.getApiType());
    messageProducerUtility.sendObjectMessageWithFilter(null, detailDTO, String.class, byte[].class, 
    MULITPLE_TRANSACTION_SERVICE_INVOCATION_QUEUE, detailDTO.getApiType(), headers);
    Inside the implementation of IMessageProcessor called from your consumer, override the method process(K key, V data, Map<String, String> headers)

    From the headers, you can fetch the filter criteria and evaluate the further processing logic.

    run(): Responsible for initiating the message consumption process. Within the run method, callthe consume method with an instance of IMessageProcessor to handle the processing of each consumed message.

  4. Consumer Group Size Configuration
    • Purpose : Defines the number of consumer instances within a consumer group and is useful for scaling when multiple partitions are configured for a topic.
    • Storage : Existing consumer group size configurations are maintained in the PROP_VALUEcolumn of the table DIGX_FW_CONFIG_ALL_B.
    • Naming pattern : <CONSUMER_GROUP_NAME>_CONSUMER_GROUP_SIZE

      Example : 'PoliciesTopicGroup_CONSUMER_GROUP_SIZE'

    • Adding a New Consumer Group Entry : If a bank or consulting firm increases their topic partition count and wants to scale their consumers accordingly, they should add a configuration entry following the existing pattern. If not added, default will be 1.

      Example SQL Insert Statement:

      Insert into DIGX_FW_CONFIG_ALL_B
                (PROP_ID,CATEGORY_ID,PROP_VALUE,FACTORY_SHIPPED_FLAG,PROP_COMMENTS,SUMMARY_TEXT,
              CREATED_BY,CREATION_DATE,LAST_UPDATED_BY,LAST_UPDATED_DATE,OBJECT_STATUS,OBJECT_VERSION_NUMBER,
              EDITABLE,CATEGORY_DESCRIPTION)
                values ('PoliciesTopicGroup_CONSUMER_GROUP_SIZE','KAFKA_CONFIG','1','N',null,'consumer
                group size for PoliciesTopic','ofssuser',sysdate,'ofssuser',sysdate,'A',1,'Y',
              'PoliciesTopicGroup_CONSUMER_GROUP_SIZE');Consumer group size as per 
    • Consumer group size as per “enableSeparateConsumerGroupsPerServer” flag :

      Scenario 1: enableSeparateConsumerGroupsPerServer = true

      • Each server instance will create its own consumer group.
      • Max consumers per group = Number of topic partitions.
      • Example :
        Total partitions = 10 
        Managed servers = 2 
        Max consumers in a group = 10
        Recommended consumer group size = Up to 10 per server

        Scenario 2: enableSeparateConsumerGroupsPerServer = false

        • All instances of a particular consumer belong to the same consumer group.
        • The number of consumers per server should be calculated as

          Total Partitions ÷ Number of Managed Servers

        • Example :
          Total partitions = 10 
          Managed servers = 2 
          Max consumers in a group = 10      
          Recommended consumer group size = Up to 5 per server

          Note:

          The consumer group size should not exceed the partition count of the topic.
  5. Creating SPI Entry for Consumer

    A file named com.ofss.digx.infra.events.kafka.consumer.IConsumer should be created in resources/META-INF/services in com.ofss.digx.cz.kafka.{module}.consumer and the entry of the consumer class has to be provided in this file.

3. Implementing Event Processing Logic
  • com.ofss.digx.infra.events.processor.IMessageProcessor

    This interface is designed to support both JMS and Kafka. Implementing this interface provides a common business logic layer to ensure maintainability, code reusability and consistent processing approach across messaging systems.

  • Write a class implementing com.ofss.digx.infra.events.processor.IMessageProcessor in your Gradle project. Inside this class, override the process method and write the message or event processing logic. This class has to be invoked from the Kafka consumer and JMS listener classes. Make sure the project’s JAR file is a part of the class-path of the application where the consumer is defined.

    IMessageProcessor<K,V>

    Table 11-2 IMessageProcessor<K,V>

    Modifier and type Method and description
    void process(K key, V data)

    processes messages from listener (JMS) or consumer (Kafka)

    key - The key object associated with the message

    data - The data to be processed.

    void default process(K key, V data, Map<String, String> headers)processes messages from listener (JMS) or consumer (Kafka)

    Default method. Provides event headers.

    key - The key object associated with the message

    data - The data to be processed.

    headers - Event headers associated with every message

    Example class extending ByteArrayConsumer and using IMessageProcessor implementation which will be used to consume data from Kafka topic.

    Sample Kafka byte array consumer code

    package com.ofss.digx.kafka.sms.consumer.authorization.policy; import com.ofss.digx.app.sms.dto.authorization.policy.PolicyMapDTO;import
              com.ofss.digx.app.sms.processors.authorization.policy.PoliciesMessageProcessor;import com.ofss.digx.infra.events.kafka.consumer.AvroConsumer;
    import org.slf4j.Logger;import org.slf4j.LoggerFactory; public class PoliciesTopicConsumer extends ByteArrayConsumer<String, byte[], PolicyMapDTO>
              {     
    private static final String
              THIS_COMPONENT_NAME = PoliciesTopicConsumer.class.getName();    
    private static final Logger logger = LoggerFactory.getLogger(THIS_COMPONENT_NAME);     
    private static final String POLICIES_TOPIC = "PoliciesTopic";    
    private static final String POLICIES_TOPIC_GROUP = "PoliciesTopicGroup";     
    public PoliciesTopicConsumer() throws Exception 
    {        
    super(String.class, byte[].class,
              PolicyMapDTO.class);    
    }     
    @Override public String topicName()
            {        
    return POLICIES_TOPIC;    
    }     
    @Override public String consumerGroup()
            {        
    return POLICIES_TOPIC_GROUP;    
    }          
    @Override public boolean
              enableSeparateConsumerGroupsPerServer() 
    {        
    return true;    
    }     
    @Override public void run() 
    {        
    logger.info("Entering into 
    run method of {}", THIS_COMPONENT_NAME);        
    consume(new PoliciesMessageProcessor());        
    logger.info("Exiting from run method of {}", THIS_COMPONENT_NAME);    
    }
    }

    Example class implementing IMessageProcessor<K,V>

    package com.ofss.digx.app.sms.processors.authorization.policy; 
    import com.ofss.digx.app.sms.dto.authorization.policy.PolicyMapDTO;
    import com.ofss.digx.app.sms.service.authorization.provider.RoleTransactionAccessService;
    import com.ofss.digx.infra.exceptions.Exception;
    import com.ofss.digx.infra.events.processor.IMessageProcessor;import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;  
    public class PoliciesMessageProcessor implements IMessageProcessor<String, 
    PolicyMapDTO>
    {     
    /**     * Stores the entity name represented by this {@code Class} object as a {@code String}     */    
    private static final String THIS_COMPONENT_NAME = PoliciesMessageProcessor.class.getName();    
    private static final transient Logger logger = LoggerFactory.getLogger(THIS_COMPONENT_NAME);     
    @Override    public void process(String key, PolicyMapDTO data) 
    {        
    try {            
    if (!data.getValues().isEmpty()) 
    {                
    RoleTransactionAccessService cacheLoader = RoleTransactionAccessService.getInstance(null);                
    cacheLoader.updateResourceCache(data.getValues());            
    }        
    } 
    catch (Exception e) 
    {            
    logger.error("Exception encountered while invoking process method of {}",                    
    THIS_COMPONENT_NAME, e);        
    }    
    }
    }