7 Deploying Unified Operations Message Bus
This chapter describes how to deploy Unified Operations Message Bus.
Unified Operations Message Bus Overview
The Oracle Communications Unified Operations Message Bus (OCUOMB) service is a distributed event store and stream-processing platform service. The Message Bus clients send or receive the events or messages to or from the Message Bus through a specific channel called Topic. This enables that the source and target clients or services are loosely coupled and asynchronous. Message Bus uses Apache Kafka in its platform to support the event store and stream-processing and for packaging. For deployment, Message Bus uses Strimzi.
Strimzi simplifies the process of running Apache Kafka in a Kubernetes cluster. Strimzi provides container images and operators for running Apache Kafka on Kubernetes. Strimzi operators are fundamental for the smooth running of Strimzi. These operators are software extensions to Kubernetes that make use of custom resources to manage applications and their components. These operators simplify the process of:
- Deploying, running, and upgrading the Kafka cluster and its components.
- Configuring and securing access to Kafka.
- Creating and managing Kafka topics.
Operators are a method of packaging, deploying, and managing a Kubernetes application. The Strimzi operators extend Kubernetes functionality and automate common and complex tasks related to a Kafka deployment. By implementing knowledge of Kafka operations in code, Kafka administration tasks are simplified and require less manual intervention. See https://strimzi.io/docs/operators/latest/overview.html for more details on the Strimzi operators. Strimzi has the following operators:
- Cluster Operator: Deploys and manages the Apache Kafka clusters, Kafka Connect, Kafka Mirror Maker, Kafka Bridge, Kafka Exporter, Cruise Control, and the Entity Operator.
- Entity Operator: Comprises the Topic Operator and User Operator
- Topic Operator: Manages Kafka topics
See the following webssites for more information on Strimzi and Apache Kafka:
- Strimzi: https://strimzi.io/
- Apache Kafka: https://kafka.apache.org/
The Message Bus service provides scripts and helm charts to deploy and manage the Apache Kafka cluster in Kubernetes by using the Strimzi operator and Kubernetes Custom Resources definitions. The Message Bus service does not provide any image builder toolkits to build the container images and by default, Helm charts pull the required container images from the quay.io/strimzi container repository.
Table 7-1 Container Images and Purposes
Container Image | Purpose |
---|---|
quay.io/strimzi/operator:<Strimzi_Operator_version> | Container Image with Strimzi Operator. |
quay.io/strimzi/kafka:<Strimzi_Operator_version>-kafka-<Kafka_version> |
Container Image with Apache Kafka and Strimzi distribution. In the following sections, the reference to the container image is named as STRIMZI_KAFKA_IMAGE_NAME |
Note:
See "UIM Software Compatibility" in UIM Compatibility Matrix for the latest versions of software.
Message Bus Cloud Native Architecture
The Message Bus service uses Apache Kafka as a distributed event store platform. To run an Apache Kafka cluster on Kubernetes, the Message Bus service uses the Strimzi operator. Strimzi is an open-source project that provides container images and operators for running Apache Kafka on Kubernetes.
Figure 7-1 Message Bus Cloud Native Architecture
Access to Message Bus
While deploying the Message Bus Service in Kubernetes namespace, the following Kubernetes service objects are created to access the Message Bus pods either internally or externally (through an ingress controller). The external access is provided through the ingress controller by IngressRouteTCP objects.
Note:
You can override the value of subDomainNameSeparator. The default separator is ".", This value can be modified as "-" to match the wild-card pattern of SSL certificates.
#subDomainNameSeparator: "."
#Example hostnames for "-" : quick-sr-messaging-bootstrap.uim.org
Figure 7-2 Process of Accessing the Message Bus
The external access to Message Bus service is supported with TCP+TLS+OAuth 2.0 Authentication through Traefik ingress controller or a Generic ingress controller. The internal access to Message Bus Service is also supported with TCP+TLS+OAuth 2.0 Authentication where TLS can be configurable. Access to Message Bus service is configured through the listeners section in applications.yaml file.
Note:
- If the client is in the same Kubernetes cluster, the internal listener is used.
- If the client is outside the Kubernetes cluster, the ingress listener is used.
The Message Bus is deployed using the scripts provided in Common CNTK. For deployment prerequisites, see "Planning UIM Installation".
The following steps need to be followed to deploy a Kafka cluster in a Kubernetes namespace in a cluster:
- Deploy the Strimzi operator to manage your Kafka cluster.
Note:
This is an administrative one-time activity where additional cluster roles are required.- Create a namespace to deploy Strimzi Operator.
- Deploy Strimzi Operator in the namespace. See "Deploying Strimzi Operator" for more information.
- Deploy the Message Bus that has Kafka cluster, ZooKeeper cluster, and entity
operator.
- Create a namespace to deploy the Kafka cluster.
- Register the namespace with Strimzi Operator. See "Register namespaces with Strimzi Operator" for more information.
- Register the namespace with Traefik. See "Registering the Namespaces with Strimzi Operator" for more information.
Note:
- The ingress controller (Traefik or Generic) has to be available.
- Register the namespace with Traefik ingress controller. If you use Generic Ingress controller, ensure that ingress.className is set in the applications.yaml file.
- Deploy Kafka Cluster in the namespace. See "Deploy Kafka Cluster and Kafka Topic" for more information.
- Validate the deployment with sample standalone producer and consumer clients. See the "Validating the Kafka cluster" and "Internal access - same namespace - plain" for more information.
Strimzi Operator
Export the Strimzi operator namespace environment variable to run the deployment script using the COMMON_CNTK:
export STRIMZI_NS=<STRIMZI_OPERATOR_NAMESPACE>
Configuration
The configurable parameters of the Strimzi Operator charts and their default values are listed in the corresponding subsections within this document.
To override the default values, copy the $COMMON_CNTK/samples/strimzi-operator-override-values.yaml file to the directory $SPEC_PATH/<STRIMZI_PROJECT>, where <STRIMZI_PROJECT> is the Kubernetes namespace where the Strimzi operator is being deployed.
Create Global Resources
While deploying multiple Strimzi operators in the same Kubernetes cluster, ensure
that one operator has createGlobalResources set to
true
in the
strimzi-operator-override-values.yaml file, and all other
operators are set to false
. While other operators can use
an earlier version of Strimzi Operator, Oracle recommends you to keep all
operators on the same version to avoid potential compatibility issues with
CRDs and the corresponding shared resources.
Private Container Repository
The Strimzi operator pulls the Strimzi component container images from
quay.io registry. If you want to maintain private container registry,
pull
the images from the quay.io registry and
push
them into the private container registry. It is
mandatory to push the images with same name and tag, the repository path can be
different. For Strimzi image and tag names, see "Unified Operations
Message Bus Overview" for more information.
See "About Container Image Management" section from UIM Cloud Native Deployment Guide for more information on private container repository management.
To use the private container registry, uncomment and modify the values in
$SPEC_PATH/<STRIMZI_PROJECT>/strimzi-operator-override-values.yaml file.
Provide the modified strimzi-operator-override-values.yaml file path as an
-f
option to the Strimzi operator create/upgrade
command.
If the private container registry requires authentication, create the Kubernetes secret in the namespace and provide the secret name as part of strimzi-operator-override-values.yaml file. Create the secret with same name in the namespace where the Kafka cluster is planned to deploy.
strimzi-operator-override-values.yaml file (Sample)
defaultImageRegistry: <Image registry>
defaultImageRepository: <Image Repository>
image:
imagePullSecrets: <Pull Secret>
The following is a sample command to create Kubernetes secret for the registry. Create the secret in the namespace where the Strimzi operator is being deployed. See https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ for creating secret.
kubectl create secret docker-registry <secret-name> --docker-server=<Image Registry> \
--docker-username=<Username> \
--docker-password=<Password> \
-n <STRIMZI_OPERATOR_NAMESPACE>
ImagePullPolicy
The following sample of ImagePullPolicy for Strimzi Operator is provided. To create the policy using a different procedure, see https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy
strimzi-operator-override-values.yaml file (Sample)
image:
imagePullPolicy: IfNotPresent
Resources
These resources are used for configuring the virtual resources (limits and requests). Uncomment or add the blow resources section with new values in the strimzi-operator-override-values.yaml file.
resources:
requests:
memory: <Mi>
cpu: <m>
limits:
memory: <Gi>
cpu: <"1">
fullReconciliationIntervalMs: 120000
operationTimeoutMs: 300000
The default values are as follows:
resources.limits.memory: 500Mi
resources.limits.cpu: 500m
resources.requests.memory: 1Gi
resources.requests.cpu: 1
Along with the above resources, you can provide the following additional configurations:
# Full reconciliation interval in milliseconds
fullReconciliationIntervalMs: 120000
# Operation timeout in milliseconds
operationTimeoutMs: 300000
Deploying Strimzi Operator
Run the following script to deploy the Strimzi operator in the Kubernetes namespace:
$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c create
Optionally, run the following script to deploy the Strimzi operator in Kubernetes namespace with custom image registry and repository:
$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c create -f $SPEC_PATH/<STRIMZI_OPERATOR_NAMESPACE>/strimzi-operator-override-values.yaml
Upgrading Strimzi Operator
Run the following script to upgrade the Strimzi Operator in Kubernetes namespace:
$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c upgrade
Note:
While upgrading strimzi-cluster-operator to a newer version, use the old toolkit for the older version of strimzi (the one already deployed) and new toolkit for upgrading to a newer version, in the case of Create, Upgrade, Delete, Register and Unregister operations.
Optionally, run the following script to deploy the Strimzi operator in Kubernetes namespace with custom image registry and repository:
$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c upgrade -f $SPEC_PATH/<STRIMZI_OPERATOR_NAMESPACE>/strimzi-operator-override-values.yaml
Uninstalling Strimzi Operator
Run the following script to uninstall the Strimzi Operator from Kubernetes namespace:
$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c delete
Validating Strimzi Operator
Validate the Strimzi operator that is installed in the provided namespace by running the following command:
$kubectl get pod -n <STRIMZI_OPERATOR_NAMESPACE>
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-*******-*** 1/1 Running 0 6m55s
Validate the Helm release installed for the Strimzi operator in the provided namespace by running the following command:
$helm list -n <STRIMZI_OPERATOR_NAMESPACE>
NAME NAMESPACE REVISION STATUS CHART APP VERSION
strimzi-operator <STRIMZI_OPERATOR_NAMESPACE> 1 deployed strimzi-kafka-operator-x.y.z x.y.z
Restarting the Strimzi Operator
Run the following script to restart the Strimzi Operator:
$COMMON_CNTK/scripts/strimzi-operator.sh -p <STRIMZI_OPERATOR_NAMESPACE> -c restart
Registering the Namespaces with Strimzi Operator
To create and manage the Kafka cluster in a Kubernetes namespace, this namespace must be registered with the Strimzi operator to monitor the CRDs.
Run the following script to register the namespace(s) with the Strimzi operator to monitor and create or manage the Kafka cluster and its components:
$COMMON_CNTK/scripts/register-namespace.sh -p <Namespace to be monitored> -t strimzi
Configuring the applications.yaml File for Message Bus
Modify the values in the applications.yaml file and upgrade or create the Message Bus service. The following configurations are available for the Message Bus service:
- Image Pull Secrets
- Security Context
- Cluster Size
- Storage
- Broker Defaults
- JVM Options
- Kafka Topics
- Accessing Kafka Cluster
- Authentication
Using Image Pull Secrets
You use the Image Pull Secrets sample only while using the private container repository that requires authentication. These authentication details have to be provided as Kubernetes secret object in the namespace where the Kafka cluster is planned to be deployed. This process is also followed while deploying Strimzi Operator.
Note:
Provide the secret name in the kafka-cluster section, if using different secret name than in the Strimzi Operator's namespace.Image Pull Secrets (Sample)
imagePullSecret:
imagePullSecrets:
- name: <secret name>
The sample command to create secret object for registry authentication is as follows:
kubectl create secret docker-registry <secret-name> --docker-server=<Image Registry> \
--docker-username=<Username> \
--docker-password=<Password> \
-n <Kafka-Namespace>
See https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ to create the secret object.
Security Context
The userSecurity section that has securityContext is applicable only when you want to define privilege and access control settings for a pod or container. The pod security context which is configured at the pod-level is provided as a sample and is applied to all containers in given pod.
Note:
If a value is commented, it cannot be used, To use a different key-value, uncomment the corresponding value in applications.yaml.
See https://strimzi.io/blog/2022/09/09/configuring-security-context-in-pods-managed-by-strimzi/ and https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ for more information.
userSecurity:
securityContext:
runAsNonRoot: <true/false>
runAsUser: <userID>
runAsGroup: <groupID>
fsGroup: <fsGroup>
Cluster Size
The Message Bus cluster consists of Kafka Brokers and Zookeeper nodes. Modify the replicas count for the Kafka Brokers and Zookeeper nodes according to the usage. For high availability of Message Bus service, make sure the number of replicas is minimum 3 for Kafka and Zookeeper, in production instance and adjust Kafka Broker configuration accordingly:
kafka-cluster:
replicas:
kafka: 3
zookeeper: 3
Storage
The Message Bus uses Strimzi to deploy the Apache Kafka cluster in Kubernetes cluster. For Strimzi to work as required, an efficient data storage infrastructure is essential. Oracle recommends using a block storage as Strimzi is tested for using with block storage. For more information on data storage, see https://strimzi.io/docs/operators/latest/deploying#considerations-for-data-storage-str
The Message Bus Service stores the events (or messages) in block storage using the Kubernetes Persistent Volumes. Modify the values for class, size, and isDeleteClaim values in storage section under the Kafka cluster. The storage class must have dynamic persistent volume provision capability:
kafka-cluster:
#storage:
#When storage.type below is set as "persistent-claim", the storage class name & size are mandatory to be set
#type: persistent-claim
#class: psrnfsn1
#size: 1Gi
#isDeleteClaim: false
For development to use ephemeral (that is, temporary container storage), do not change the values. These values must be commented for ephemeral.
Broker Defaults
The following configuration is applied when the Topics are auto created. Modify the following settings in the kafkaConfig section under the Kafka cluster accordingly:
kafka-cluster:
kafkaConfig:
#The default replication factor for automatically created topics
defaultReplicationFactor: 2
offsetsTopicReplicationFactor: 2
transactionStateLogReplicationFactor: 2
transactionStateLogMinIsr: 2
minInsyncReplicas: 2
logRetentionMinutes: 30
numPartitions: 3
The values for replicationFactors and minimum in-sync replicas must be entered according to the values entered in the Kafka Cluster. These values must be less than or equal to the Kafka Cluster replica values.
For more information on the values, see the Kafka documentation at: https://kafka.apache.org/081/documentation.html#brokerconfigs
JVM Options
The Message Bus cluster consists of Kafka Brokers and Zookeeper nodes. Modify the jvmOptions for Kafka Brokers and Zookeeper nodes according to the usage. See https://strimzi.io/docs/operators/latest/full/configuring.html#con-common-configuration-jvm-reference for more details.
jvmOptions:
kafka:
-Xms: 1024m
-Xmx: 1024m
# javaSystemProperties:
# - name: <placeHolder>
# value: <value>
zookeeper:
-Xms: 1024m
-Xmx: 1024m
# javaSystemProperties:
# - name: <placeHolder>
# value: <value>
Kafka Topics
Add or update the Kafka Topics in the applications.yaml file in the kafkaTopics section which are required for the Message Bus service clients (producers or receivers).
For example:
kafka-topic:
#List of Kafka topics
kafkaTopics:
- name: <topic1>
partitions: <no_partitions>
replicas: <no_replicas>
config:
retention: 7200000
segmentBytes: 1073741824
The following topics are required for the ATA integration which are defined in the applications.yaml file within the Common CNTK samples. These topics are created during the deployment of Message Bus service using Common CNTK:
Table 7-2 Topic, producer, and consumer details.
Topic | Producer | Consumer | Additional Details |
---|---|---|---|
ora-uim-topology | UIM | ATA | See Unified Inventory Management System Administration Overview in UIM System Administrator’s Guide for more details. |
ora-alarm-topology | Assurance System | ATA | For more information, see "Deploying the Active Topology Automator Service" |
ora-retry-topology | ATA | ATA | For more information, see "Deploying the Active Topology Automator Service" |
ora-dlt-topology | ATA | ATA | For more information, see "Deploying the Active Topology Automator Service" |
ora-sol005-lcm | Sol Adapter | External sol5 consumer | |
ora-alarm-retry | ATA | ATA | For more information, see "Deploying the Active Topology Automator Service" |
ora-alarm-dlt | ATA | ATA | For more information, see "Deploying the Active Topology Automator Service" |
ora-test-topic | Standalone Test Client | Standalone test client |
Note:
Do not use the default topics (ora-uim-topology, ora-fault-topology, ora-retry-topology and ora-dlt-topology) for a standalone testing. Use only the ora-test-topic to test the deployment of Message Bus service.
Accessing Kafka Cluster
There are various listener type configurations available to access the Message Bus service internally and externally. The Authentication configuration is applied across all listener types. As part of Kafka cluster deployment, the Kubernetes service objects are created to provide access to Kafka cluster pods. This service objects are created based on the listener type configuration in the applications.yaml file for message-bus section. You can access the Message Bus service in any of the following ways:
- Accessing within the same cluster (Internal access)
- Accessing from outside of the cluster (External access)
Note:
When a Message Bus service is deployed, it autogenerates the certificates of TLS for server and client. You must use the custom certificates so that the certificates are retained when the service is terminated and created again. See "Using Wild Card Certificates" for more information.
Accessing the Message Bus service from within the same cluster (Internal access)
The internal listener configuration in the applications.yaml file is used when the client services are in the same Kubernetes cluster, which can be in the same namespace or a different namespace. This configuration is enabled by default.
kafka-cluster:
listeners:
#Plain is for internal access within the same k8s cluster.
internal:
# Enable the tls to true if encryption/decryption is needed for internal access
#tls: false
See "Configuring Message Bus Listeners" for more information.
Accessing the Message Bus service from outside of the cluster (External access)
The ingress listener configuration in the applications.yaml file is used when the client services are outside of the Kubernetes cluster. This access is achieved using the ingress controller.
# To expose the kafka-cluster to external kafka clients via ingress controller uncomment the following and modify accordingly.
# Valid values are TRAEFIK, GENERIC
ingressController: <INGRESS_CONTROLLER>
#ingress:
# #specify className field for ingressClassName of generic ingress controller.
# #In case of nginx the default values is nginx
# className: "nginx"
#provide loadbalancer port
# if TLS is enabled in global section, then loadbalancerport will be used as external port for Generic or Traefik.
loadbalancerport: <loadBalancer-port>
kafka-cluster:
listeners:
#To expose the kafka-cluster to external kafka clients via ingress controller (traefik or generic) uncomment the following and modify accordingly.
#ingress:
# #The secure port of either ingress controller or external load-balancer. If TLS is Disabled in global, then below ingressSslPort will be used as external port.
# ingressSslPort: <LOADBALANCER_PORT>
# #If using Generic Ingress controller, below given annotations are mandatory for Message-Bus external access.
# #These annotations are required for nginx ingress controller.
# annotations:
# nginx.ingress.kubernetes.io/ingress.allow-http: "false"
# nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
# ingress.kubernetes.io/ssl-passthrough: "true"
# nginx.ingress.kubernetes.io/ssl-passthrough: "true"
See "Configuring Message Bus Listeners" for more information.
Accessing the Message Bus service using a nodeport listener
The nodeport listener configuration in applications.yaml file configuration is also used when the client services are outside of the Kubernetes cluster. The access is directly with the Kubernetes work node’s port.
Note:
Oracle does not recommend this listener for production. It must be used only for debugging where ingress controller is not deployed.kafka-cluster:
listeners:
#To expose the kafka-cluster to external kafka clients without ingress controller, uncomment the following section and modify accordingly
#nodeport:
#default is true. can be turned off if needed
#tls: true
#if need to expose on a static nodeport, pease uncomment the below section and provide values
#nodePort: 32100
See "Configuring Message Bus Listeners" for more information.
Configuring Authentication
Kafka 2.0.0 or later supports an extensible OAuth 2.0 compatible token-based mechanism available, called SASL OAUTHBEARER. Strimzi has developed extensions that provide integration with OAuth 2.0 compliant authorization servers. That means, in principle, you can use any OAuth 2.0 compliant authorization server to enable centrally managed users for authentication with Kafka.
The Message Bus service uses a Strimzi operator to deploy Kafka brokers and in-turn use OAuth 2.0 token-based authentication while establishing a session to a Kafka broker. With this authentication, Message Bus clients (or Kafka clients) and Kafka brokers communicate with a central OAuth 2.0 compliant authorization server. These Kafka clients use the authorization server to obtain access tokens and are configured with access tokens issued by the server. Kafka brokers communicate with authorization server to validate the tokens presented by the clients, thus confirming their identities. You can perform the validation of access token using a fast local JWT validation or a token validation using an introspection endpoint.
To configure OAuth 2.0 support for Kafka Brokers in the Message Bus service, you need to update applications.yaml file and create or upgrade the service.
Prerequisites
- The Authorization server (OAuth 2.0 compliant) is up and running. See "Deploying OAM along with OHS for Authentication Service" in Authentication Service
- Configure the client for Kafka broker in the authorization server. See "Creating a Client" section in Authentication Service
- Configure the clients for Kafka producer or consumer application in the authorization server. See "Creating a Client" section in Authentication Service
- Kafka cluster is configured with oauth type Authentication. See the following sections.
Enable Authentication on Kafka Cluster:
This procedure describes how to configure Kafka brokers so that the broker listeners are enabled to use OAuth 2.0 authentication by using an authorization server.
Note:
Oracle recommends to use OAuth 2.0 over an encrypted interface through a listener with tls. Plain listeners are not recommended.To enable authentication on the Kafka cluster:
- In applications.yaml, un-comment or add the following
configurations:
- Set the authentication.enabled flag to true and update the loadbalancerhost, loadbalancerport and ohsHostname in $SPEC_PATH/sr/quick/applications.yaml file.
- To use fast local JWT validation, set
useFastLocalJWTvalidation value to true under
kafka-cluster.listeners.authentication. If not set, the
introspection endpoint is used for validation.
# The enabled flag is to enable or disable authentication authentication: enabled: true #Uncomment the below host aliases section and provide hostname to ipaddresss mappings #This will add entries to POD's /etc/hosts file for hostname resolution when DNS and other options are not applicable. #For more details see https://kubernetes.io/docs/tasks/network/customize-hosts-file-for-pods/ #hostAliases: #- ip: <ip-address> #hostnames: #- <hostname-1> # Ex. quick.sr.ohs.uim.org #Sample sub-section for using fast local jwt validation kafka-cluster: listeners: authentication: useFastLocalJWTvalidation: true
- The Message Bus service uses other configuration values from
Kubernetes Secret (<namespace>-<instance>-oauth-credentials)
and Config Map (<namespace>-<instance>-oauth-config-cm)
objects from the same namespace. This Secret and Configuration Map Kubernetes
objects have to be created before deploying the Message Bus service for
authentication. See "Adding Common OAuth Secret and ConfigMap" for creating the secret. The configuration values used are:
- clientID: The client ID to identify the client.
- clientSecret: The client secret used for authentication.
- validIssuerUri: The URI of the token issuer used for authentication.
- introspectionEndpointUri: The URI of the token introspection endpoint.
- jwksEndpointUri: The endpoint with public keys of authentication server that has to be used for fast local JWT validation.
- tlsTrustedCertificate: The trusted certificates for TLS connection to the authorization server.
The following optional values are supported for authentication. See Strimzi documentation https://strimzi.io/docs/operators/in-development/configuring.html#type-KafkaListenerAuthenticationOAuth-reference for details on each value. Add the following optional values as required, under the kafka-cluster.listeners.authentication section in applications.yaml file:
# Additional optional authentication values
kafka-cluster:
listeners:
authentication:
oauthConfig:
#Enable or disable audience checking
checkAudience:
#Enable or disable issuer checking. By default issuer is checked using the value configured by validIssuerUri
checkIssuer:
#The audience to use when making requests to the authorization server’s token endpoint
clientAudience:
#The scope to use when making requests to the authorization server’s token endpoint
clientScope:
#The connect timeout in seconds when connecting to authorization server
connectTimeoutSeconds:
#Enable or disable TLS hostname verification. Default value is false.
disableTlsHostnameVerification:
#The read timeout in seconds when connecting to authorization server.
readTimeoutSeconds:
#URI of the User Info Endpoint to use as a fallback to obtaining the user id
userInfoEndpointUri:
#Name of the claim from the JWT authentication token
userNameClaim:
Using GC Logs
By default, GC logs are disabled, you can enable it and view the logs on
stdout
by using kubectl logs
<kafka-cluster-pod-name>
.
To Enable GC logs, update $SPEC_PATH/<project>/<instance>/applications.yaml file as follows:
- Under
gcLogs
makeenabled
astrue
. - Uncomment the
gcLogs
option underkafka-cluster
to override common values.gcLogs: enabled: true
Note:
You do not have to configure fileSize and noOfFiles as the logs are printed on the stdout.
Deploying and Managing Message Bus
Kafka cluster consists of Kafka Brokers and Zookeeper nodes. Once the Strimzi operator is successfully installed in the Kubernetes cluster and a namespace for the Kafka cluster is registered to monitor, you can deploy and manage the Kafka cluster.
Update the applications.yaml file as per your requirement and verify the following configuration elements in the yaml file before deploying the Kafka cluster:
Note:
If applications.yaml is not copied from Common CNTK, copy the $COMMON_CNTK/samples/applications.yaml file to your local directory, for example: $SPEC_PATH/sr/quick, where the sr is the Kubernetes namespace and quick is the instance name.
- The Storage class name that is used to create persistent volumes.
- The Kafka cluster replicas, which is the number of Kafka Brokers and Zookeeper nodes.
- Virtual Resource sizing.
- The Kafka Broker default settings.
- The listeners to be exposed with authentication and TLS.
- Authentication details.
- Metrics enablement.
- Affinity settings
- Update partitions, replicas, and retention period values for the default Kafka Topics.
See "Configuring the applications.yaml File for Message Bus" for more details.
Deploying Message Bus
Run the following commands to deploy the Kafka cluster with Kafka Topics in a Kubernetes namespace:
$COMMON_CNTK/scripts/create-applications.sh \
-p <kafka cluster namespace> \
-i <kafka cluster instance name> \
-f <path to override values yaml file> \
-a messaging-bus
For example:
In the following command, sr is a namespace and quick an instance name:
$COMMON_CNTK/scripts/create-applications.sh -p sr -i quick -f $SPEC_PATH/sr/quick/applications.yaml -a messaging-bus
Upgrading Message Bus
The Kafka cluster upgrade requires persistent storage enabled for rolling update. Oracle recommends you have multiple replicas so that the service is not down while upgrading.
Update the Kafka cluster configuration in the applications.yaml file:
$COMMON_CNTK/scripts/upgrade-applications.sh \
-p <kafka cluster namespace> \
-i <kafka cluster instance name> \
-f <path to override values yaml file> \
-a messaging-bus
For example, run the following command to upgrade the Kafka cluster and Kafka topic running in sr namespace with instance as quick:
$COMMON_CNTK/scripts/upgrade-applications.sh -p sr -i quick -f $SPEC_PATH/sr/quick/applications.yaml -a messaging-bus
Deleting Message Bus
Run the following script to delete or uninstall the Kafka cluster and Kafka Topic from the Kubernetes namespace:
$COMMON_CNTK/scripts/delete-applications.sh \
-p <kafka cluster namespace> \
-i <kafka cluster instance name> \
-f <path to override values yaml file> \
-a messaging-bus
For example: Run the following command to delete the Kafka cluster with Kafka topic running in sr namespace with instance as quick:
$COMMON_CNTK/scripts/delete-applications.sh -p sr -i quick -f $SPEC_PATH/sr/quick/applications.yaml -a messaging-bus
Validating Message Bus
Check the pods created for the Kafka cluster. The following sample output shows the internal listener configuration. If it has any external listener settings, the additional service objects appear:
$kubectl get svc -n sr
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
sr-quick-messaging-kafka-bootstrap ClusterIP <clusterIP> <none> 9091/TCP,9092/TCP 22m
sr-quick-messaging-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP 22m
sr-quick-messaging-zookeeper-client ClusterIP <clusterIP> <none> 2181/TCP 23m
sr-quick-messaging-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 23m
Check the Service object created for the Kafka cluster. The following sample output shows the Kafka and ZooKeeper replica as 1.
$kubectl get pod -n sr
NAME READY STATUS RESTARTS AGE
sr-quick-messaging-entity-operator-*****-**** 3/3 Running 0 27h
sr-quick-messaging-kafka-0 1/1 Running 0 27h
sr-quick-messaging-zookeeper-0 1/1 Running 0 27h
Check the Helm release:
$helm list -n sr
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
sr-quick-messaging sr 1 ***** deployed kafka-cluster-<x.y.z> <x.y.z>
Check the persistent volume claims created:
$kubectl get pvc -n sr`
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
data-sr-quick-messaging-kafka-0 Bound <volume> 1Gi RWO sc 27h
data-sr-quick-messaging-zookeeper-0 Bound <volume> 1Gi RWO sc 27h
Run a standalone producer or consumer. See "Internal access - same namespace – plain" to run standalone producer and consumer pods in a Kafka cluster namespace.
Note:
As part of deploying, upgrading, and deleting the Message Bus, the Kafka topics are also created, upgraded, and deleted from the configuration provided in the input yaml file.
Restarting Message Bus
The restart-application.sh script with application name as messaging-bus restarts all the subcomponents such as Kafka, ZooKeeper, and Entity Operators of the Message Bus. Run the following command to restart:
$COMMON_CNTK/scripts/restart-applications.sh -p sr -i quick -f $SPEC_PATH/sr/quick/applications.yaml -a messaging-bus
Note:
The Message Bus service restart requires to have multiple replicas so that the service is not down while upgrading and the replica count should be greater than or equal to 2.
To validate the restart option, see "Validating Message Bus".
Alternate Configuration Options
There are various alternate options for configuring the Message Bus.
Log Level
Kafka uses Apache log4j. By default, it is enabled with INFO
. Update this in applications.yaml file for debugging with
the INFO
level:
logging:
kafka:
logLevel: INFO
zookeeper:
logLevel: INFO
Choosing Worker Nodes for Running Message Bus Service
Update the Message Bus service configuration section in the applications.yaml file to node affinity or pod affinity and anti-affinity to constrain which nodes your pod can be scheduled. Alternatively, co-locate the pods in same node (or separate) and run either create or upgrade script.
Node Affinity
Node affinity is conceptually similar to nodeSelector, that enables you to constrain which nodes your pod can be scheduled, based on the node labels.
There are two types of node affinities:
- Schedule a pod using required node affinity: The scheduler cannot schedule the pod unless the rule is met.
- Schedule a pod using preferred node affinity: The scheduler tries to find a node that meets the rule. If a matching node is not available, the scheduler continues to schedule the pod.
Preferred node affinity
The sample configuration for enabling preferred node affinity is as follows:
kafka-cluster:
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
preference:
matchExpressions:
- key: name
operator: In
values:
- south_zone
Kubernetes pod is scheduled on the node with label name as south_zone. If node with label name: south_zone is not available, pod will still be scheduled on another node.
Pod Affinity and Anti-Affinity
The Pod Affinity or anti-affinity allows you to constrain which node your pod is eligible to be scheduled, based on the labels on other pods.
Similar to node affinity, there are two types of pod affinity and anti-affinity:
requiredDuringSchedulingIgnoredDuringExecution
preferredDuringSchedulingIgnoredDuringExecution
Pod Affinity
Assign a Kubernetes pod to a node based on the labels on other pods using the Pod Affinity in a Kubernetes cluster. Modify the Kafka cluster override values yaml file.
The sample configuration for enabling the required pod affinity is as follows:
kafka-cluster:
affinity:
podAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
Kubernetes pod is scheduled on the node which contains a pod with label http://app.kubernetes.io/name: kafka.
Modify the Kafka cluster override values yaml file. The sample configuration for enabling the preferred pod affinity is as follows:
kafka-cluster:
affinity:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
The Kubernetes pod is scheduled on the node which contains a pod with label http://app.kubernetes.io/name: kafka. If the node is not available, pod will still be scheduled on another node.
Pod anti-affinity
Assign a Kubernetes pod to a node based on the labels on other pods using pod anti affinity in a Kubernetes cluster.
Modify the Kafka cluster override values yaml file. The sample configuration with required pod anti-affinity is as follows:
kafka-cluster:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
Kubernetes pod is scheduled on the node which does not contain a pod with label http://app.kubernetes.io/name: kafka.
Modify the Kafka cluster's override values yaml file. The sample configuration with preferred pod anti-affinity is follows:
kafka-cluster:
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
Kubernetes pod is scheduled on the node which does not contains a pod with label http://app.kubernetes.io/name: kafka. If node is not available, pod will still be scheduled on another node.
Managing Message Bus Metrics
Metrics in Message Bus are configured by enabling the JMX Exporter and Kafka Exporter. JMX Exporter can be enabled to get JVM metrics of Kafka cluster and Kafka Exporter can be enabled on a Kafka cluster to extract additional Prometheus metrics data from Kafka brokers, which is related to offsets, consumer groups, consumer lag, and topics.
See https://strimzi.io/docs/operators/latest/overview.html#metrics-overview_str for more information on metrics from Strimzi.
Enable metrics
Enable Kafka Exporter and JMX Exporter in the $SPEC_PATH/sr/quick/applications.yaml file and upgrade or create the Message Bus service. The sample content is as follows:
kafka-cluster:
metrics:
kafkaExporter:
enable: true
jmxExporter:
enable: true
The above configuration exposes the Prometheus metrics for Kafka Brokers, Topics, and Consumer Groups components on metrics end-point on the pods. You can view these details on Prometheus UI by configuring the Scrape job. You can view this information in the form of graphs using the Grafana dashboard.
See https://github.com/danielqsj/kafka_exporter#metrics to see the exposed metrics.
Prometheus and Grafana setup
See Setting Up Prometheus and Grafana for more information.Adding scrape Job in Prometheus
Add the following Scrape job in Prometheus Server. This can be added by editing the config map used by the Prometheus server:
- job_name: Message_bus
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- 'sr'
relabel_configs:
- separator: ";"
regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
replacement: $1
action: labelmap
- source_labels: [__meta_kubernetes_namespace]
separator: ";"
regex: (.*)
target_label: namespace
replacement: $1
action: replace
- source_labels: [__meta_kubernetes_pod_name]
separator: ";"
regex: (.*)
target_label: kubernetes_pod_name
replacement: $1
action: replace
- source_labels: [__meta_kubernetes_pod_node_name]
separator: ";"
regex: (.*)
target_label: node_name
replacement: $1
action: replace
- source_labels: [__meta_kubernetes_pod_host_ip]
separator: ";"
regex: (.*)
target_label: node_ip
replacement: $1
action: replace
Sample Grafana dashboards
Add the Prometheus data source and import the sample Grafana dashboards from Strimzi github.
The sample Grafana dashboard for Kafka and JMX Exporters can be downloaded from the following links:
Installing and Configuring Mirror Maker 2.0
This section describes the installation and configuration of Mirror Maker 2.0.
Configuring Source and Target Message Bus (Kafka cluster) Details
Update the $COMMON_CNTK/samples/messaging-bus/kafka-mirror-maker/values.yaml with source and target Kafka cluster details as follows:
sourceCluster:
#Source Kafka cluster
name: sr1-quick1-messaging
#Bootstarp server for connection to the source Kafka cluster
bootstrapServers: sr1-quick1-messaging-kafka-bootstrap:9092
targetCluster:
#Target Kafka cluster
name: sr2-quick2-messaging
#Bootstarp server for connection to the target Kafka cluster
bootstrapServers: sr2-quick2-messaging-kafka-bootstrap:9092
In the above command:
- sourceCluster.name is the helm release for source Kafka cluster (sr1-quick1-messaging)
- sourceCluster.bootstrapServers is the bootstrap server of source Kafka cluster (sr1-quick1-messaging-kafka-bootstrap:9092)
- targetCluster.name is the helm release for target Kafka cluster (sr2-quick2-messaging)
- targetCluster.bootstrapServers is the bootstrap server of target Kafka cluster (sr2-quick2-messaging-kafka-bootstrap:9092)
Note:
$COMMON_CNTK/samples/messaging/kafka-mirror-maker/values.yaml
If the sr1-quick1-messaging-kafka-bootstrap service is hosted in Strimzi namespace on 9092 port and the client application in another namespace, then the bootstrap-server URL should be used as sr1-quick1-messaging-kafka-bootstrap.strimzi.svc.cluster.local
If the target cluster is in another Kubernetes cluster, you must to use external listener for referring to the boostrap server.
While using Nodeport
, the worker node IP of the target cluster is
to be used as the target cluster bootstrap address along with the exposed
nodeport.
While using Ingress
, the hostname of the target cluster is to be
used as target cluster bootstrap address.
Installing Mirror Maker
helm install mirror-maker $COMMON_CNTK/samples/messaging/kafka-mirror-maker/ -n <namespace> --values $COMMON_CNTK/samples/messaging/kafka-mirror-maker/values.yaml
Validate that Mirror Maker is installed by running the following command:
kubectl get pods -n <namespace>
replication-mirror-maker-mirrormaker2-5c6d7dd7d7-r89cj 1/1 Running 0 67m
kubectl get svc -n <namespace>
replication-mirror-maker-mirrormaker2-api ClusterIP <clusterIP> <none> 8083/TCP 67m
Uninstalling Mirror Maker
Run the following command to uninstall Mirror Maker from specific namespace:
helm uninstall mirror-maker -n <namespace>
Delete topic mm2-offset-syncs.messaging-test.internal from the source cluster (dev1-messaging)
$kubectl -n <SourceKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server <instance>-messaging-kafka-bootstrap:9092 --delete --topic mm2-offset-syncs.messaging-test.internal
Delete topics heartbeats, mirrormaker2-cluster-status, mirrormaker2-cluster-offsets, mirrormaker2-cluster-configs from the target cluster (dev2-messaging)
$kubectl -n <TargetKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server <namespace>-<instance>-messaging-kafka-bootstrap:9092 --delete --topic heartbeats
$kubectl -n <TargetKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server <namespace>-<instance>-messaging-kafka-bootstrap:9092 --delete --topic mirrormaker2-cluster-status
$kubectl -n <TargetKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server <namespace>-<instance>-messaging-kafka-bootstrap:9092 --delete --topic mirrormaker2-cluster-offsets
$kubectl -n <TargetKafkaClusterNamespace> run kafka-topic -ti --image=<STRIMZI_KAFKA_IMAGE_NAME> --rm=true --restar
Client Access
Accessing Message Bus in events producer and consumers clients.
Internal Access in the Same namespace for Plain
When the message producer or consumer applications are in same namespace as the Message Bus service then they can access the Kafka cluster using the Bootstrap Kubernetes service object name and port.
Run the following command to test the standalone producer. Here the project namespace is sr and instance is quick.
$kubectl -n sr run kafka-producer-plain -ti \
--image=<STRIMZI_KAFKA_IMAGE_NAME> \
--rm=true --restart=Never \
-- bin/kafka-console-producer.sh \
--bootstrap-server sr-quick-messaging-kafka-bootstrap:9092 \
--topic ora-test-topic
Type a few lines of text and each ENTER sends a message to Kafka broker. Type CTRL-C to quit.
Run the following command to test the standalone consumer. Here the project namespace is sr and instance is quick.
$kubectl -n sr run kafka-consumer-plain -ti \
--image=<STRIMZI_KAFKA_IMAGE_NAME> \
--rm=true --restart=Never \
-- bin/kafka-console-consumer.sh \
--bootstrap-server sr-quick-messaging-kafka-bootstrap:9092 \
--group ora-uim-consumer-test --isolation-level read_committed \
--topic ora-test-topic --from-beginning
You get responses after the validation is successful.
Internal Access in a Different namespace for Plain
When the massage producer or consumer applications are in different namespace than the Message Bus service then they can access the Kafka cluster using the bootstrap service name and port but need to suffix <namespace>.svc.cluster.local to the service name.
See "Internal access - same namespace - plain" section on running the standalone console test producer and consumer pods for testing. Replace the bootstrap-server url with sr-quick-messaging-kafka-bootstrap.sr.svc.cluster.local, where the namespace is sr and instance is quick.
Internal Access in the Same namespace for Authentication
When the message producer or consumer applications are in same namespace as the Message Bus service then they can access the Kafka cluster using the bootstrap Kubernetes service object name and port.
Create a test client pod definition.
- Copy the following YAML content into the bastion host (or worker node) as mb-test-client-deployment.yaml file.
- Update the hostAliases section according to your OAuth service environment.
- Update the STRIMZI_KAFKA_IMAGE_NAME.
- Update the OAUTH Endpoint, Client Id and Secret.
- Update the OAUTH Endpoint, Client Id, Client Secret, Scope, Audience, and anything else that are applicable to your client configuration
apiVersion: apps/v1
kind: Deployment
metadata:
name: mb-test-auth-client-deployment
labels:
app: mb-test-auth-client
spec:
replicas: 1
selector:
matchLabels:
app: mb-test-auth-client
template:
metadata:
labels:
app: mb-test-auth-client
spec:
# <Uncomment below and replace with your bootstrap and brokers DNS names>
#hostAliases:
#- ip: <LOADBALANCER_IP>
# hostnames:
# - "<OHS_HOSTNAME>"
containers:
- name: mb-test-client
image: <STRIMZI_KAFKA_IMAGE_NAME>
command:
- "tail"
- "-f"
- "/dev/null"
imagePullPolicy: IfNotPresent
env:
- name: OAUTH_TOKEN_ENDPOINT_URI
value: <Update the OAUTH_TOKEN_ENDPOINT_URI>
- name: OAUTH_CLIENT_ID
value: <Update the OAUTH_CLIENT_ID>
- name: OAUTH_CLIENT_SECRET
value: <Update the OAUTH_CLIENT_SECRET>
# - name: OAUTH_SCOPE
# value: <Uncomment and update OAUTH_SCOPE>
# - name: OAUTH_AUDIENCE
# value: <Uncomment and update OAUTH_AUDIENCE>
ports:
- containerPort: 9090
name: http
protocol: TCP
Create the authentication properties in a file (mb_test_client.properties).
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
Run the test client container and provide authentication properties
#Apply the test client pod definition in the namespace (say "sr").
$kubectl apply -f mb-test-client-deployment.yaml -n sr
#Get the newly created pod name
$kubectl get pod -n sr | grep mb-test-auth-client-deployment
#Sample Output
#mb-test-auth-client-deployment-******-**** 1/1 Running 0 98s
#Copy the mb_authentication.properties file into the pod
$kubectl -n sr cp mb_test_client.properties mb-test-auth-client-deployment-******-****:/home/kafka/mb_test_client.properties
Test for message bus producer client:
- Start an interactive shell process in the test client pod
- Export the environment variables needed for the authentication
- Run the console producer command.
- Enter some string messages
#Get the newly created pod name
kubectl get pod -n sr | grep mb-test-auth-client-deployment
#Exec into the newly created pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the following test console producer
bin/kafka-console-producer.sh \
--producer.config /home/kafka/mb_test_client.properties \
--bootstrap-server sr-quick-messaging-kafka-bootstrap:9092 \
--topic ora-test-topic
Test for message bus consumer client:
- Start an interactive shell process in the test client pod
- Export the environment variables needed for the authentication
- Run the console consumer command.
- You will see the previous string messages of producer
#Get the newly created pod name
kubectl get pod -n sr | grep mb-test-auth-client-deployment
#Sample Output
#mb-test-auth-client-deployment-******-**** 1/1 Running 0 98s
#Exec into the newly created pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the following test console consumer
bin/kafka-console-consumer.sh \
--consumer.config /home/kafka/mb_test_client.properties \
--bootstrap-server sr-quick-messaging-kafka-bootstrap:9092 \
--topic ora-test-topic \
--from-beginning
External ingress access - SSL and Authentication
The external access to Message Bus is provided through Ingress controller (Traefik or Generic) with TLS enabled. The following must be performed in clients for testing:
- Export and import the Message Bus service (that is sr-quick-messaging-cluster-ca-cert, where sr is namespace and quick is instance) certificate into clients.
- Export and import the certificate of OAuth service into the clients.
Note:
This is optional and is required only if OAuth is enabled for SSL. - Update the bootstrap and brokers DNS names with load balancer IP in the etc/hosts file of clients (that is, event producer or consumer applications).
- Update the DNS name of OAuth service with load balancer IP in
/etc/hosts file of clients.
Note:
This is optional and is required only if the OAuth service requires DNS name to access. - Run the producer or consumer script with SSL and Authentication details
In the following section, the external ingress access test is provided with Strimzi Kafka container. If you want to test the client code without Kubernetes cluster then you can download the Apache Kafka and perform the same.
Add Message Bus service and OAuth service certifications to trust store. See Import/export of TLS certificates section.
#Run the below command to export and import the Message Bus service certificate into the trust store (mb-cert-keystore.jks) file.
$COMMON_CNTK/scripts/export-message-bus-cert.sh -p sr -i quick -l . -k ./mb-test-client-cert-keystore.jks -a mb-cert
#Get the OAuth (OAM) service certificate and import into trust store (mb-test-client-cert-keystore.jks) file (Optional, needed if OAuth is SSL)
keytool -importcert -alias oauth-server -file <Path to OAuth Server certificate, the .pem file> -keystore ./mb-test-client-cert-keystore.jk --trustcacerts -noprompt
Create the following authentication properties in a file (mb_test_client.properties).
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
ssl.endpoint.identification.algorithm=
Create a test client pod definition.
- Copy the following YAML content into the bastion host (or worker node) as "mb-test-client-deployment.yaml" file.
- Update the Strimzi Kafka image.
- Update the hostAliases section according to your OAuth and Message Bus service setup. This will add entries to /etc/hosts file.
- Update the OAuth Endpoint, Client Id, Client Secret, and Trust Store Password in env section.
Note:
You can override the value of subDomainNameSeparator. The default is .. This value can be changed as "-" to match the wild card pattern of SSL certificates.
To override, uncomment and change this value in applications.yaml. See "Using Wild Card Certificates" for more information.
apiVersion: apps/v1
kind: Deployment
metadata:
name: mb-test-client-deployment
labels:
app: mb-test-client
spec:
replicas: 1
selector:
matchLabels:
app: mb-test-client
template:
metadata:
labels:
app: mb-test-client
spec:
# <Uncomment below and replace with your bootstrap and brokers DNS names>
# hostAliases:
# - ip: <Replace with your LOADBALANCER_IP>
# hostnames:
# - "<INSTANCE.PROJECT.messaging.broker0.uim.org>"
# - "<INSTANCE.PROJECT.messaging.brokerN.uim.org>"
# - "<INSTANCE.PROJECT.messaging.bootstrap.uim.org>"
# - "<Replace with OHS_HOSTNAME>"
containers:
- name: mb-test-client
image: quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
command:
- "tail"
- "-f"
- "/dev/null"
imagePullPolicy: IfNotPresent
env:
- name: OAUTH_TOKEN_ENDPOINT_URI
value: <Replace with your OAUTH_TOKEN_ENDPOINT_URI>
- name: OAUTH_CLIENT_ID
value: <Replace with your OAUTH_CLIENT_ID>
- name: OAUTH_CLIENT_SECRET
value: <Replace with your OAUTH_CLIENT_SECRET>
#- name: OAUTH_SCOPE
# value: <Uncomment and replace with your OAUTH_SCOPE>
#- name: OAUTH_AUDIENCE
# value: <Uncomment and replace with yours OAUTH_AUDIENCE>
- name: KAFKA_OPTS
value: " \
-Djavax.net.ssl.trustStore=/home/kafka/mb-test-client-cert-keystore.jks \
-Djavax.net.ssl.trustStorePassword=<Replace with your store password> \
-Djavax.net.ssl.trustStoreType=JKS"
ports:
- containerPort: 9090
name: http
protocol: TCP
Run the test client container and apply readiness for authentication and SSL.
#Apply the test client pod definition in the namespace (say "sr").
$kubectl apply -f mb-test-client-deployment.yaml -n sr
#Get the newly created pod name
$kubectl get pod -n sr | grep mb-test-client-deployment
#Sample Output
#mb-test-client-deployment-******-**** 1/1 Running 0 98s
#Copy the certificate store into the newly created pod. Replace the pod name below
kubectl -n sr cp mb-test-client-cert-keystore.jks <Replace with mb-test-client-deployment pod name>:/home/kafka/mb-test-client-cert-keystore.jks
#Copy the mb_test_client.properties file into the POD
kubectl -n sr cp mb_test_client.properties <Replace with mb-test-client-deployment pod name>:/home/kafka/mb_test_client.properties
Start a shell session inside container for console test producer.
#Get the newly created pod name
$kubectl get pod -n sr | grep mb-test-auth-client-deployment
#Sample Output
#mb-test-auth-client-deployment-******-**** 1/1 Running 0 98s
#Exec into the newly created pod
kubectl exec -it<Replace with mb-test-client-deployment pod name> -n sr -- bash
#Run the following producer command
bin/kafka-console-producer.sh \
--producer.config /home/kafka/mb_test_client.properties \
--bootstrap-server quick.sr.messaging.bootstrap.uim.org:30443 \
--topic ora-test-topic
Start start a shell session inside container for console test consumer:
#Exec into the newly created pod
kubectl exec -it <Replace with mb-test-client-deployment pod name> -n sr -- bash
#Run the following producer command. Replace the bootstrap-server url accordingly to your environment
bin/kafka-console-consumer.sh \
--consumer.config /home/kafka/mb_test_client.properties \
--bootstrap-server sthatipa.sr.messaging.bootstrap.uim.org:30443 \
--consumer-property group.id=test-client-service \
--topic ora-test-topic --from-beginning
Clean-up the newly created test pod:
kubectl delete -f mb-test-client-deployment.yaml -n sr
External node port access
The nodeport listener type allows the external access from outside of the Kubernetes cluster using the load balancer or Kubernetes worker node ip address and nodePort(port of worker node).
The Bootstrap URL is constructed with worker node IP Address and node port of bootstrap service.
Get the host port of the external bootstrap service using the following command:
$kubectl get service sr-quick-messaging-kafka-nodeport-bootstrap -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}' -n sr
Output: 32100
Get the IP Address of the Kubernetes worker node. Replace the <NODE_NAME> in the following with your node name:
$kubectl get node <NODE_NAME> -o=jsonpath='{range .status.addresses[*]}{.type}{"\t"}{.address}{"\n"}' -n sr
Output:
InternalIP 100.xx.xx.142
Hostname *********
Update the Kafka cluster Bootstrap URL as 100.xx.xx.142:32100 in the events producer and consumer applications.
To access with plain, see "Internal access - same namespace - plain" section. Replace the bootstrap URL with above constructed one.
To access with Authentication, see "Internal access - same namespace - authentication" section. Replace the bootstrap URL with above constructed one.
To access with SSL and Authentication, see "External ingress access - SSL & Authentication" section. Replace the bootstrap URL with above constructed one.
Import/export of TLS certificates
To enable TLS encrypted access, the ca-certs of Kafka cluster is needed to be extracted and imported into key store and the location of that key store is used as the producer or consumer properties in events application.
Export the ca-certs of the Kafka cluster using the following command:
$COMMON_CNTK/scripts/export-message-bus-cert.sh -p <Namespace of kafka cluster> \
-i <instance name of kafka cluster> \
-l <directory to export clustercerts temporarily> \
-k <keystore-location> \
-a <alias for cert>
For example:
$COMMON_CNTK/scripts/export-message-bus-cert.sh -p sr -i quick -l . -k ./mb-cert-keystore.jks -a mb-sr-quick-cert
The export-cluster-cert.sh script creates JKS type truststore by default in the provided key store location. If any other truststore type is created, specify that as producer or consumer property while running the clients. These exported artifacts can be used in Kafka client applications.
Note:
If custom certificates were used during cluster creation, then these can be directly provided through a keystore than extracting the generated certs.
Using custom certificates
Custom certificates can be used while creating the Kafka cluster:
Prerequisites:
- Certificates and keys are to be in PEM format.
- Key should not be encrypted. Encrypted keys are not supported since they need user interaction for entering the passphrase during access.
Creating a custom certificate
To create a custom certificate, see "SSL Certificates".
Create Kubernetes secret
Run the following command by replacing the placeholders:
kubectl create secret generic <secret-name> --from-file=<key-file-name> --from-file=<certificate-file-name>
For example:
kubectl create secret generic myCustomCertSecret --from-file=commonkey.pem --from-file=commoncert.pem
Update Kafka Cluster configuration
Update the customCerts configuration section in Kafka cluster's override values yaml file:
kafka-cluster:
## to enable custom or owned certs for tls please create a kubernetes secret with the cert and key if not already present, uncomment the below section and add respective values.
## please be advised that encrypted keys are not supported since they require user interaction for the passphrase
customCerts:
# Secret in which cert and key are present
secretName: <secret-name created above>
certName: <certificate file used in the secret created above>
keyName: <key-file used in the secret created above>
Configuring Message Bus Listeners
Message Bus has three listeners (internal, ingress and nodeport) to access the service. These are described the in following sections.
Message Bus Internal Listener
The following is the configuration for internal listener type which can be commented or uncommented.
kafka-cluster:
listeners:
# plain is for internal access within the same k8s cluster.
internal:
From same namespace in cluster
This is an internal access method that is used by the message producer or consumer clients (or applications) when they are deployed in same namespace as the Message Bus service. This is enabled by default with internal listener type. To access the Message Bus, the producer or consumer applications must get the Bootstrap service URL of the Kafka cluster.
To get the Bootstrap service URL of the Kafka cluster run the following command:
kubectl get svc -n sr | grep sr-quick-messaging-kafka-bootstrap
sr-quick-messaging-kafka-bootstrap ClusterIP <clusterIP> <none> 9091/TCP,9092/TCP
Note:
The project namesapce is sr and instance is quick.Use the sr-quick-messaging-kafka-bootstrap:9092 URL in the producer and consumer client configuration in the applications.
From another namespace in cluster
This is an internal access method which is used by the producer or consumer client applications when they are deployed in different namespace than the message-bus service. This is enabled by default with internal listener type. To access the Message Bus, the producer or consumer client applications have to get the Bootstrap service URL of the Kafka cluster and convert the URL pattern as serviceName.namespace.svc.cluster.local.
If the sr-quick-messaging-kafka-bootstrap service is hosted in sr namespace on 9092 port and the client applications from different namespace can access the Kafka cluster with Bootstrap URL as sr-quick-messaging-kafka-bootstrap.sr.svc.cluster.local:9092
Message Bus Ingress Listener
This is an external access method which is used by message producer or consumer applications when they are deployed outside of the Kubernetes cluster. This is disabled by default and must be enabled in the applications.yaml. This external access is provided through the Traefik Ingress Controller and Generic Ingress Controller to the Kafka cluster. To enable this external access, the ingress listener type configuration must be enabled in the Kafka cluster configuration yaml file.
Ingress listener type
Un-comment the ingress lister type section in applications.yaml file to expose the Message Bus Service outside of Kubernetes cluster. Ingress controller (Traefik or Generic) should be deployed in order for this ingress listener type to work and Message Bus namespace must be registered with Traefik operator. In case of Generic Ingress, set ingress.className according to your Generic Ingress Controller.
In case of Generic Ingress controller (NGINX), annotations given under the kafka-cluster.listeners.ingress.annotations tag in applications.yaml are mandatory.
# To expose the kafka-cluster to external kafka clients via ingress controller uncomment the following and modify accordingly. # Valid values are TRAEFIK, GENERIC
ingressController: "TRAEFIK"
#ingress:
# #specify className field for ingressClassName of generic ingress controller.
# #In case of nginx the default values is nginx
# className: "nginx"
#provide loadbalancer port
# if TLS is enabled in global section, then loadbalancerport will be used as external port for Generic or Traefik
loadbalancerport: <loadBalancer-port>
kafka-cluster
listeners:
ingress:
# if TLS is Disabled in global, then ingressSslPort will be used as external port.
ingressSslPort: <LoadBalancer_SSL_Port>
# If using Generic Ingress controller, below given annotations are mandatory for Message-Bus external access.
# These annotations are required for nginx ingress controller in Message-Bus.
annotations:
nginx.ingress.kubernetes.io/ingress.allow-http: "false"
nginx.ingress.kubernetes.io/backend-protocol: "HTTPS"
ingress.kubernetes.io/ssl-passthrough: "true"
nginx.ingress.kubernetes.io/ssl-passthrough: "true"
In external producer or consumer messaging clients (or applications), the following must be done to access the Kafka cluster through Ingress controller.
- The Bootstrap server and advertised broker host names must be configured in DNS at client side.
- Import the TLS certificate and trust stores from the Kafka cluster into client configurations.
- Add required additional properties in Kafka producer or consumer client configuration.
DNS settings in client applications host
The Bootstrap server host name and advertised broker host names must be configured in /etc/hosts file in producer and consumer client applications with the Traefik or Load Balancer IP Address. Hostnames are pre-configured when deployed with ingress listener type enabled with the following pattern:
bootstrap-server: <kafka-cluster-instance-name>.<kafka-cluser-project-name>.messaging.bootstrap.uim.org
broker-0: <kafka-cluster-instance-name>.<kafka-cluser-project-name>.messaging.broker0.uim.org
broker-1: <kafka-cluster-instance-name>.kafka-cluser-project-name>.messaging.broker1.uim.org
For example if a instance is quick and namesapce is sr then the hostnames will be as follows:
bootstrap-server: quick.sr.messaging.bootstrap.uim.org
broker-0: quick.sr.messaging.broker0.uim.org
broker-1: quick.sr.messaging.broker1.uim.org
Note:
You can override the value of subDomainNameSeparator. The default value is ".", This value can be changed to "-" to match the wild card pattern of SSL certificates.
To override the value, uncomment and change it in applications.yaml as follows:
#subDomainNameSeparator: "."
#Example hostnames for "-" : quick-sr-messaging-bootstrap.uim.org
Importing certificates into client applications
See the "Import/export of TLS certificates" section in “Client Access” section for exporting the ca-certs of Kafka cluster to producer or consumer applications.
Message Bus NodePort Listener
This is another external access method which is used by events producer or consumer client applications when they are deployed out-side of the Kubernetes cluster and wants to access the message-bus service without ingress controller.
Node port
The following configuration in the application yaml file allows exposing the nodeport listener type to access the Message Bus externally with tls and OAuth 2.0 Authentication.
Kafka-cluster:
listeners:
#To expose the kafka-cluster to external kafka clients without ingress controller, uncomment the following section and modify accordingly.
nodeport:
tls: true
# if need to expose on a static nodeport, please uncomment the below nodePort key and provide values.
nodePort: 32100
authentication: true
When the tls is enabled the certificates of the Kafka cluster must be imported in the events producer and consumer clients to access the Kafka cluster.
See the "Import/export of TLS certificates" section in “Client Access” section for exporting the auto-generated ca-certs of Kafka cluster.
Geo Redundancy Support
The Geo Redundancy of Message Bus (which uses Kafka) is achieved with Mirror Maker tool. Apache Kafka Mirror Maker replicates data across two Kafka clusters, within or across data centers. See https://strimzi.io/blog/2020/03/30/introducing-mirrormaker2/ for more details.
The following diagram shows an example of how mirror maker replicates the topics from source Kafka cluster to target Kafka cluster.
The prerequisites are as follows:
- The Strimzi operator should be up and running
- The source Message Bus service should be up and running
- The target Message Bus service should be up and running
Strimzi Operator
Validate that the Strimzi operator is installed by running the following command:
$kubectl get pod -n <STRIMZI_NAMESPACE>
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-566948f58c-sfj7c 1/1 Running 0 6m55s
Validate installed helm release for Strimzi operator by running the following command:
$helm list -n <STRIMZI_NAMESPACE>
NAME NAMESPACE REVISION STATUS CHART APP VERSION
strimzi-operator STRIMZI_NAMESPACE 1 deployed strimzi-kafka-operator-0.X.0 0.X.0
Source Message Bus
The source Message Bus should be up and running (the Kafka cluster from which the topics should be replicated).
Validate the Kafka cluster is installed by running the following command:
$kubectl get pod -n sr1
NAME READY STATUS RESTARTS AGE
sr1-quick1-messaging-entity-operator-5f9c688c7-2jcjg 3/3 Running 0 27h
sr1-quick1-messaging-kafka-0 1/1 Running 0 27h
sr1-quick1-messaging-zookeeper-0 1/1 Running 0 27h
Validate the persistent volume claims created for the Kafka cluster by running the following command:
$kubectl get pvc -n sr1
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
data-sr1-quick1-messaging-kafka-0 Bound <volume> 1Gi RWO sc 27h
data-sr1-quick1-messaging-zookeeper-0 Bound <volume> 1Gi RWO sc
Target Message Bus
The target Message Bus should be up and running (the Kafka cluster to which the topics should be replicated).
Validate the Kafka cluster is installed by running the following command:
$kubectl get pod -n sr2
NAME READY STATUS RESTARTS AGE
sr2-quick2-messaging-entity-operator-5f9c688c7-2jcjg 3/3 Running 0 27h
sr2-quick2-messaging-kafka-0 1/1 Running 0 27h
sr2-quick2-messaging-zookeeper-0 1/1 Running 0 27h
Validate the persistent volume claims created for the Kafka cluster by running the following command:
$kubectl get pvc -n <kafka target namespace>`
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
data-sr2-quick2-messaging-kafka-0 Bound <volume> 1Gi RWO sc 27h
data-sr2-quick2-messaging-zookeeper-0 Bound <volume> 1Gi RWO sc 27h
Installing and configuring Mirror Maker 2.0
A sample is provided at $COMMON_CNTK/samples/messaging/kafka-mirror-maker.
Debugging and Troubleshooting
NotEnoughReplicasException
When you get the org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required. The reason could be that the topics replicas is not meeting the default minInsyncReplicas value configured in the Message Bus service.
Asynchronous auto-commit of offsets failed
When you get the following error in the logs (for example: ATA Consumer). To resolve this make sure that max.polling.interval.ms is always greater than the last poll or else reduce the max.poll.records.
[Consumer clientId=consumer-ora-uim-topology-service-2, groupId=ora-uim-topology-service] Asynchronous auto-commit of offsets failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.. Will continue to join group.
Add these additional properties in the YAML file under the mp.messaging.connector.helidon-kafka section with override values.
mp.messaging:
connector:
helidon-kafka:
# The following are default global configuration values which effects for all the consumer groups.
max.polling.interval.ms: 300000
max.poll.records: 500
# The following are channel specific configuration values
incoming:
# The toInventoryChannel effects only for ora-uim-topology-service consumer group
# uncomment and update the specific values
#toInventoryChannel:
#max.polling.interval.ms: 300000
#max.poll.records: 500
# The toFaultChannel effects only for ora-uim-topology-retry-service consumer group
# Uncomment and update the specific values
#toRetryChannel:
#max.polling.interval.ms: 300000
#max.poll.records: 200
# The toDltChannel effects only for ora-uim-topology-dlt-service consumer group
# uncomment and update the specific values
#toDltChannel:
#max.polling.interval.ms: 300000
#max.poll.records: 100
Performance Tuning: Consumer Configurations
The following are some consumer configuration properties in message consumers which are related to performance. See https://kafka.apache.org/documentation/#consumerconfigs for all available consumer config properties.
- max.poll.records (default=500) defines the maximum number of messages that a consumer can poll at once.
- max.partition.fetch.bytes (default=1048576) defines the maximum number of bytes that the server returns in a poll for a single partition.
- max.poll.interval.ms (default=300000) defines the time a consumer must process all messages from a poll and fetch a new poll afterward. If this interval is exceeded, the consumer leaves the consumer group.
- http://heartbeat.interval.ms (default=3000) defines the frequency with which a consumer sends heartbeats.
- http://session.timeout.ms (default=10000) defines the time a consumer must send a heartbeat. If no heartbeat was received in that timeout, the member is considered dead and leaves the group.
Managing Consumer Groups
For more list of options available on the consumer groups see the apache kafka managing consumer groups section. The following sub-sections list some significant operations. See "Message Bus Client Access" for more information.
List consumer groups
#Exec into running message bus test client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the below command to list all the consumer groups
bin/kafka-consumer-groups.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--list
Describe consumer group
#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the below command to describe specific consumer group to check topics, partitions, offsets
#Replace the command-config, bootstrap, group values accordingly
bin/kafka-consumer-groups.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--group test-client-service \
--describe
Reset offset of a consumer group
#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the below command to reset offset for consumer group for topic to latest. See Apache Kafka documentation for other available options.
#Replace the command-config, bootstrap, group and topic values accordingly
bin/kafka-consumer-groups.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--group test-client-service \
--reset-offsets \
--topic ora-test-topic \
--to-latest \
--execute
Topics
For more detailed list of operations available on the topics see the "Apache Kafka Operations".The following sub-sections list some significant operations. See "Message Bus Client Access" for more information.
Create
Create a topic with three partitions and two replications.
#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the below command to create a topic
bin/kafka-topics.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--create \
--topic replicated-2 \
--replication-factor 2 \
--partitions 3
List
To list all topics:
#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the below command to list all the topic
bin/kafka-topics.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--list
Describe
Describes the topic and its partition count, replicas factory along with leaders for the partition.
#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the below command to describe the topic
bin/kafka-topics.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--topic replicated-2 \
--describe
#Sample output
Topic: replicated-2 TopicId: vyalpPOmR0CtYt7Sc-gbxA PartitionCount: 3 ReplicationFactor: 2 Configs: min.insync.replicas=1,message.format.version=3.0-IV1
Topic: replicated-2 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: replicated-2 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: replicated-2 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0
Alter
You can alter a topic and increase the partitions to 2.
#Exec into running Kafka admin client pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash
#Run the below command to alter the topic bin/kafka-topics.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--alter \
--topic <Your Topic Name> \
--partitions 1
Reassignment
The partition reassignment tool can also be used to selectively move replicas of a partition to a specific set of brokers. In the following example the partitions for topic (replicated-2) are reassigned to different brokers.
See the "Message Bus Client Access"section for more information on running the message bus test pod with required configuration such as Authentication and SSL.
Create a file called custom-reassignment.json file a terminal
{"version":"1", "partitions":[{"topic":"replicated-2","partition":"0","replicas":"[0,1]"},{"topic":"replicated-2","partition":1,"replicas":"[1,2]"},{"topic":"replicated-2","partition":"2","replicas":"[0,2]"}]}
Run the following commands for reassignment:
#Copy the custom-reassignment.json file into the newly created pod under /home/kafka directory
$kubectl cp custom-reassignment.json mb-test-auth-client-deployment-*****-****:/home/kafka/custom-reassignment.json -n kafka
#Exec into running test pod
kubectl exec -it mb-test-auth-client-deployment-******-**** -n sr -- bash #Cd directory to /home/kafka
#Validate the topic ("replicated-2"
/opt/kafka/bin/kafka-topics.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--topic replicated-2 --describe
Topic: replicated-2 TopicId: vyalpPOmR0CtYt7Sc-gbxA PartitionCount: 3 ReplicationFactor: 2 Configs: min.insync.replicas=1,message.format.version=3.0-IV1
Topic: replicated-2 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: replicated-2 Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: replicated-2 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0
#Run reassign-partitions script to reassign the partitions according to the json file
$/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server dev-messaging-kafka-bootstrap:9092 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"replicated-2","partition":0,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"replicated-2","partition":1,"replicas":[0,1],"log_dirs":["any","any"]},{"topic":"replicated-2","partition":2,"replicas":[1,0],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for replicated-2-0,replicated-2-1,replicated-2-2
#Verfify the reassignment status
$/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server dev-messaging-kafka-bootstrap:9092 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition replicated-2-0 is complete.
Reassignment of partition replicated-2-1 is complete.
Reassignment of partition replicated-2-2 is complete.
Clearing broker-level throttles on brokers 0,1,2
Clearing topic-level throttles on topic replicated-2
# Validate the partition assignments
$/opt/kafka/bin//kafka-topics.sh \
--command-config /home/kafka/mb_test_client.properties \
--bootstrap-server <Your Bootstrap Server URL> \
--topic replicated-2 --describe
Topic: replicated-2 TopicId: vyalpPOmR0CtYt7Sc-gbxA PartitionCount: 3 ReplicationFactor: 2 Configs: min.insync.replicas=1,message.format.version=3.0-IV1
Topic: replicated-2 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: replicated-2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: replicated-2 Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2