Kafka Clients

Kafka clients are mostly components in the Kafka ecosystem that read and write data into Kafka topics.

Note

While configuring Kafka clients, the module com.sun.security.auth.module.JndiLoginModule can't be used for JAAS configuration of servers and clients. We recommend you use the default module com.sun.security.auth.module.Krb5LoginModule for authentication using Kerberos.

Common Commands

Note

  • The Kafka service must be installed and started in one of the nodes.
  • To run the common commands, SSH to the node where the Kafka service is installed.
  • To create a topic to publish events:
    1. Create a file client.properties with the following details, and provide appropriate access to Kafka user
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true
      keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
                              
    2. Run:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-topics.sh --bootstrap-server <hostname>:6667 --create --topic Test_Topic --partitions 3 --replication-factor 1 --command-config /usr/odh/current/kafka-broker/config/client.properties
  • To publish events to topics:
    1. Create a file producer.properties with following details and provide appropriate access to Kafka user.
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
                                  
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
    2. Run:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-producer.sh --broker-list <hostname>>:6667 --topic Test_Topic --producer.config /usr/odh/current/kafka-broker/config/producer.properties
  • To read published data from topics:
    1. Create a file consumer.properties with following details and provide appropriate access to Kafka user.
      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      bootstrap.servers=<broker_hostname>>:6667
      compression.type=none
      sasl.mechanism=GSSAPI
      group.id=test-consumer-group   
      sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
      required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/<hostname>@REALM";
    2. Run:
      sudo -u kafka sh /usr/odh/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server <hostname>:6667 --topic Test_Topic --consumer.config /usr/odh/current/kafka-broker/config/consumer.properties --from-beginning

Kafka Producers

Kafka producers are the publishers responsible for writing records to topics. Typically, this means writing a program using the KafkaProducer API.

To instantiate a producer, run:

KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

See the following for more information on the producer settings used in this constructor.

Serialization of Keys and Values

For each producer, two serialization properties must be set, key.serializer (for the key) and value.serializer (for the value). Write custom code for serialization or use one of the options provided by Kafka. For example:

  • ByteArraySerializer: Binary data
  • StringSerializer: String representations

Acknowledgments

The full write path for records from a producer is to the leader partition and then to all the follower replicas. The producer can control which point in the path triggers an acknowledgment. Depending on the acks setting, the producer might wait for the write to propagate all the way through the system or only wait for the earliest success point.

Valid acks values are:

  • 0: Don't wait for any acknowledgment from the partition (fastest throughput).
  • 1: Wait only for the leader partition response.
  • all: Wait for follower partitions responses to meet minimum (slowest throughput).

Kafka Consumers

Kafka consumers are the subscribers responsible for reading records from one or more topics and one or more partitions of a topic. Consumers subscribing to a topic happens manually or automatically. Typically, this means writing a program using the KafkaConsumer API.

To create consumer, run:

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);

The KafkaConsumer class has two generic type parameters. As producers can send data (the values) with keys, the consumer can read data by keys. In this example, both the keys and values are strings. If you define different types, you must define a deserializer to accommodate the alternate types. For deserializers you must implement the org.apache.kafka.common.serialization.Deserializer interface.

The most important configuration parameters to specify include:

  • bootstrap.servers: A list of brokers to initially connect to. List two to three brokers. You don't need to list the full cluster.
  • group.id: Every consumer belongs to a group to share the partitions of a topic.
  • key.deserializer/value.deserializer: Specify how to convert the Java representation to a sequence of bytes to send data through the Kafka protocol.

Subscribing to a Topic

Subscribing to a topic using the subscribe() method call:

kafkaConsumer.subscribe(Collections.singletonList(topic), rebalanceListener)

Specify a list of topics that we want to consume from and a rebalance listener. Rebalancing is an important part of the consumer's life. Whenever the cluster or the consumer's state changes, a rebalance is issued. This ensures that all the partitions are assigned to a consumer.

After subscribing to a topic, the consumer polls to find new records:

while (true) {
  data = kafkaConsumer.poll();
  // do something with 'data'
}

The poll returns records that can be processed by the client. After processing the records the client commits offsets synchronously, thus waiting until processing completes before continuing to poll.

Be sure to save the progress. Run commitSync() and commitAsync() methods.

We don't recommend auto commit. Manual commit is appropriate in most use cases.

Accessing Big Data Service Kafka from Outside the Cluster

Steps to kinit a user from local:

  1. Ensure that Kerberos client is installed in your environment.
  2. Copy the Kafka keytab from the cluster from location /etc/security/keytabs/kafka.service.keytab to your environment.
  3. Copy the contents of /etc/krb5.conf from cluster broker host to your environment's /etc/krb5.conf file (this is the default location of krb5 conf file.
  4. Create host entries for all cluster nodes in your /etc/hosts file.
  5. Do kinit from your environment using keytab and principal using command sudo kinit -kt <keytab path in local> <principal name>.
  6. Ensure the ticket is issued using the command klist.

After kinit is complete, complete the following steps to run Producer/Consumer:

  1. Copy /usr/odh/current/kafka-broker/config/kafka_jaas.conf to local.
  2. Copy /usr/odh/current/kafka-broker/config/ to localserver.properties.
  3. Set env variable export KAFKA_OPTS="-Djava.security.auth.login.config=/local/path/to/kafka_jaas.conf".
  4. Be sure kinit is complete and valid ticket is available (not expired) kinit -kt /local/keytab/path/kafka.service.keytab kafka/<principal>.

With the previous setup, you can follow common commands to run the Kafka clients from outside Big Data Service cluster or running any Java/python clients.