Kafka Bridge
The Kafka Bridge microservice takes messages embedded with JSON, XML, or plaintext from a configured input (Pulsar or Kafka) and enqueues them to the configured output (Pulsar, Kafka, or Standard Out).
Note:
This microservice was previously called Kafka Adapter (kafka-adapter in the Helm chart and commands). It is now Kafka Bridge (kafka-bridge).
This microservice is part of the Event microservice pipeline. See Understanding the Event Pipeline in Unified Assurance Concepts for conceptual information. It can also participate in other pipelines that need to connect to Kafka.
You can enable redundancy for this microservice when you deploy it. See Configuring Microservice Redundancy for general information.
You can enable autoscaling for this microservice when you deploy it when you are using an internal Pulsar topic as input. See Configuring Autoscaling and Kafka Bridge Autoscaling Configuration.
This microservice provides additional Prometheus monitoring metrics. See Kafka Bridge Self-Monitoring Metrics.
Kafka Bridge Prerequisites
Before deploying the microservice, confirm that the following prerequisites are met:
-
A microservice cluster is set up. See Microservice Cluster Setup.
-
The Apache Pulsar microservice is deployed. See Pulsar.
-
You know the Kafka or Pulsar topic to send messages to or get them from.
-
You have created the appropriate Kubernetes secrets for your environment, which you will use when deploying the microservice:
-
To connect to the internal Unified Assurance Kafka broker when using basic authentication, create a secret containing the internal Kafka username and password:
a1k create secret generic <microservice-release-name>-secret --from-literal=username=<username> --from-literal=password=<password> -n <namespace>
-
To connect to the internal Unified Assurance Kafka broker when using OAuth, create a secret containing the internal OAuth client ID and secret:
a1k create secret generic <microservice-release-name>-secret --from-literal=clientId=<clientID> --from-literal=clientSecret=<secret> -n <namespace>
-
To connect to an external Kafka server:
-
Place the certificate file from your external Kafka server in the $A1BASEDIR/etc/ssl directory.
-
Create secrets for the internal OAuth client ID and secret and the external Kafka certificate:
a1k create secret generic <microservice-release-name>-secret --from-literal=clientId=<clientID> --from-literal=clientSecret=<secret> -n <namespace> a1k create secret generic <microservice-release-name>-certs --from-file=$A1BASEDIR/etc/ssl/<KafkaCert>.crt -n <namespace>
-
-
To connect to an external Kafka server that is using SSL as well as OAuth:
-
Place the certificate files from your external Kafka and OAuth servers in the $A1BASEDIR/etc/ssl directory.
-
Create secrets for the internal OAuth client ID and secret, the external Kafka certificate, and the external OAuth certificate:
a1k create secret generic <microservice-release-name>-secret --from-literal=clientId=<clientID> --from-literal=clientSecret=<secret> -n <namespace> a1k create secret generic <microservice-release-name>-certs --from-file=$A1BASEDIR/etc/ssl/<KafkaCert>.crt --from-file=/opt/assure1/etc/ssl/<OAuthcert>.crt -n <namespace>
-
In the commands:
-
<namespace> is the namespace where you will deploy the microservice. The default namespace is a1-zone1-pri.
-
<microservice-release-name> is the name you will use for the microservice instance. Oracle recommends using the microservice name (kafka-bridge) unless you are deploying multiple instances of the microservice to the same cluster.
-
Deploying Kafka Bridge
To deploy the microservice, run the following commands:
su - assure1
export NAMESPACE=<namespace>
export WEBFQDN=<WebFQDN>
a1helm install <microservice-release-name> assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM=<SASL_mechanism> <Kafka_authentication_details>
In the commands:
-
<namespace> is the namespace where you are deploying the microservice. The default namespace is a1-zone1-pri, but you can change the zone number and, when deploying to a redundant cluster, change pri to sec.
-
<WebFQDN> is the fully-qualified domain name of the primary presentation server for the cluster.
-
<microservice-release-name> is the name to use for the microservice instance. Oracle recommends using the microservice name (kafka-bridge) unless you are deploying multiple instances of the microservice to the same cluster.
-
<SASL_mechanism> is the Simple Authentication and Security Layer (SASL) mechanism used by the Kafka server you are connecting to. This can be either OAUTH or PLAIN.
-
<Kafka_authentication_details> are any additional required Kafka authentication configuration parameters. See Kafka Bridge Authentication Configuration Parameters for information about the different requirements.
You can also use the Unified Assurance UI to deploy microservices. See Deploying a Microservice by Using the UI for more information.
Kafka Bridge Authentication Configuration Parameters
The configuration parameters that you must set for the Kafka Bridge microservice depend on the SASL mechanism used by your Kafka server, whether you are connecting to the internal Unified Assurance Kafka broker or an external Kafka server, and whether the Kafka Server is using SSL.
The different combinations are described in the following table.
SASL Mechanism | Kafka Server Type | SSL Enabled | Required Parameters | Deployment Command |
---|---|---|---|---|
PLAIN | Internal | N/A | N/A | a1helm install kafka-bridge assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM="PLAIN" |
OAUTH | Internal | N/A | KAFKA_SASL_OAUTH_ENDPOINT_URI | a1helm install kafka-bridge assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM="OAUTH" --set KAFKA_SASL_OAUTH_ENDPOINT_URI=<token_URL> Where <token_URL> is the OAuth token endpoint for the Kafka broker. |
OAUTH | External | N | KAFKA_SASL_OAUTH_ENDPOINT_URI KAFKA_TLS_CA |
a1helm install kafka-bridge assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM="OAUTH" --set KAFKA_SASL_OAUTH_ENDPOINT_URI=<token_URL> --set KAFKA_TLS_CA="<KafkaCert>.crt" Where <KafkaCert> is the name of your external Kafka server's certificate file. |
OAUTH | External | Y | KAFKA_SASL_OAUTH_ENDPOINT_URI KAFKA_TLS_CA KAFKA_SASL_OAUTH_SERVER_TLS_CA |
a1helm install kafka-bridge assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM="OAUTH" --set KAFKA_SASL_OAUTH_ENDPOINT_URI=<token_URL> --set KAFKA_TLS_CA="<KafkaCert>.crt" --set KAFKA_SASL_OAUTH_SERVER_TLS_CA="<OAuthcert>.crt" Where <OAuthCert> is the name of your external OAuth server's certificate file. |
Changing Kafka Bridge Configuration Parameters
When running the install command, you can optionally change default configuration parameter values by including them in the command with additional --set arguments. You can add as many additional --set arguments as you need.
For example:
-
Set a parameter described in Default Kafka Bridge Configuration by adding --set configData.<parameter_name>=<parameter_value>. For example, --set configData.LOG_LEVEL=DEBUG.
-
Enable redundancy for the microservice by adding --set redundancy=enabled. Enabling redundancy is valid when you are using a Unified Assurance Pulsar topic as the input for the microservice
-
Enable autoscaling for the microservice by adding --set autoscaling.enabled=true. Enabling autoscaling is valid when you are using an external Kafka topic as input for the microservice.
-
Set an autoscaling parameter described in Kafka Bridge Autoscaling Configuration by adding --set autoscaling.<parameter_name>=<parameter_value>. For example, --set autoscaling.thresholds.totalEventsProcessed=500.
Default Kafka Bridge Configuration
The following table describes the default configuration parameters found in the Helm chart under configData for the microservice.
Name | Default Value | Possible Values | Notes |
---|---|---|---|
LOG_LEVEL | INFO | FATAL, ERROR, WARN, INFO, DEBUG | The logging level for the microservice. |
STREAM_INPUT | "" | Text, 255 characters | The Pulsar or Kafka topic that the microservice subscribes to. For Kafka topics, include tlsConfigPrefix=KAFKA_TLS and, if your Kafka server uses SSL, saslConfigPrefix=KAFKA_SASL. See Example Input and Output Stream Formats for examples. |
STREAM_OUTPUT | "" | Text, 255 characters | The Pulsar or Kafka topic that the microservice publishes to. For Kafka topics, use tlsConfigPrefix=KAFKA_TLS and, if your Kafka server uses SSL, saslConfigPrefix=KAFKA_SASL. See Example Input and Output Stream Formats for examples. |
REDUNDANCY_POLL_PERIOD | 5 | Integer greater than 0 | The number of seconds between status checks from the secondary microservice to the primary microservice. |
REDUNDANCY_FAILOVER_THRESHOLD | 4 | Integer greater than 0 | The number of times the primary microservice must fail checks before the secondary microservice becomes active. |
REDUNDANCY_FALLBACK_THRESHOLD | 1 | Integer greater than 0 | The number of times the primary microservice must succeed checks before the secondary microservice becomes inactive. |
KAFKA_SASL_MECHANISM | "" | OAUTH or PLAIN | The Simple Authentication and Security Layer (SASL) mechanism to use to connect with the Kafka server. |
KAFKA_SASL_OAUTH_ENDPOINT_URI | "" | Text | The OAuth server token URL for Kafka. |
KAFKA_TLS_CA | "" | Text | The certificate to use for external Kafka authentication. Only required when connecting to an external Kafka topic. |
KAFKA_SASL_OAUTH_SERVER_TLS_CA | "" | Text | The certificate to use for the external Kafka OAuth server authentication. Only required when the OAuth server uses SSL. |
Example Input and Output Stream Formats
You can specify Pulsar or Kafka topics in the STREAM_INPUT and OUTPUT_STREAM configuration parameters.
When you are communicating with a Kafka server that uses authentication, you must also include query parameters for the appropriate tlsConfigPrefix and saslConfigPrefix setting in the URL. Include tlsConfigPrefix when the server uses SSL and include saslConfigPrefix when the server uses OAuth or Basic authentication.
For example:
-
To subscribe to the TMF688 Event Processor microservice's Pulsar topic, set STREAM_INPUT to:
persistent://assure1/event/tmf688-event
-
To specify the server URL for a Pulsar topic, set STREAM_INPUT to:
pulsar+ssl://<host:port>/<topic>
-
To publish to a Kafka topic on a server that does not use SSL or authentication, set STREAM_OUTPUT to:
kafka://<host:port>/<topic>
-
To publish to a Kafka topic on a server that uses OAuth or Basic authentication and SSL, set STREAM_OUTPUT to:
kafka://<host:port>/<topic>?tlsConfigPrefix=KAFKA_TLS&saslConfigPrefix=KAFKA_SASL
For Kafka, you can also include multiple topics in the URL and include the following query parameters:
-
General query parameters:
- version: The Kafka version.
-
Input-only query parameters:
-
offset: The Kafka offset. Either oldest or newest.
-
consumerGroup: The Kafka consumer group.
-
maxMessageSize: An integer representing the maximum message size in bytes.
-
-
Output-only query parameters:
-
compressionCodec: Possible values are none, gzip, snappy, lz4, and zstd.
-
balanceStrategy: Possible values are range, roundrobin, and sticky.
-
For example, to use some of these query parameters, set the stream to:
kafka://<host:port>/topic1,topic2?version=2.1.1&tlsConfigPrefix=KAFKA_TLS&offset=newest&consumerGroup=grp1&maxMessageSize=1024
Kafka Bridge Autoscaling Configuration
Autoscaling is supported for the Kafka Bridge microservice when you are using an internal Pulsar topic as the input. See Configuring Autoscaling for general information and details about the standard autoscaling configurations.
The Kafka Bridge microservice also supports the additional configurations described in the following table.
Name | Default Value | Possible Values | Notes |
---|---|---|---|
thresholds.backlogSize | 1000 | Integer | The number of items that need to be in the backlog before the autoscaling starts additional processes. |
thresholds.totalEventsProcessed | 400 | Integer | Total events processed by the microservice. If the average of total events processed in the configured pollingInterval exceeds the threshold, pods will be scaled. |
Kafka Bridge Self-Monitoring Metrics
The Kafka Bridge microservice exposes the self-monitoring metrics described in the following table to Prometheus.
Metric Name | Type | Description |
---|---|---|
received_events_total | Counter | Number of events received. |
sent_events_total | Counter | Number of events sent. |
avg_time | Gauge | The average time to process events. |
reception_success_per_second | Gauge | The rate of successfully received messages per second. |
sending_success_per_second | Gauge | The rate of successfully sent messages per second. |
failed_events_total | Counter | The total number of failed events. |
outstanding_metric | Gauge | The number of pending events. |
starts_per_second | Gauge | The rate of messages started per second. |
fails_per_second | Gauge | The rate of messages failed per second. |
finished_per_second | Gauge | The rate of messages finished per second. |
Note:
Metric names in the database include a prefix that indicates the service that inserted them. The prefix is prom_ for metrics inserted by Prometheus. For example, received_events_total is stored as prom_received_events_total in the database.
Example Kafka Messages
Example text message:
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.example.com:9092,broker2.example.com:9092/kafka-topic"
}
],
"_domain": "...",
"_type": "...",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "text",
"text": "A plaintext message"
}
}
Example XML message:
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
}
],
"_domain": "...",
"_type": "...",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "xml",
"xml": "<?xml version=\"1.0\" encoding=\"UTF-8\"?><note><to>Tove</to><from>Jani</from><heading>Reminder</heading><body>Don't forget me this weekend!</body></note>"
}
}
Example JSON message:
{
"_agents": [
{
"@timestamp": "2020-03-18T18:41:00.000Z",
"host": "assure1host.pod.dns.com",
"id": "assure1host.pod.dns.com",
"app": "app-name",
"version": "1.0.0-1",
"input": "persistent://assure1/kafka/sink",
"output": "kafka://broker1.example.com:9092,broker2.example.com:9092/kafka-topic"
}
],
"_domain": "fault",
"_type": "event",
"_version": "5.0.0",
"@timestamp": "2020-03-18T18:41:00.000Z",
"message": {
"_type": "json",
"json": {
"event": {
"EventID": 0,
"EventKey": "",
"EventCategory": 3,
"EventType": "",
"Ack": 0,
"Action": "",
"Actor": "",
"Count": 1,
"Customer": "",
"Department": "",
"Details": {},
"DeviceType": "",
"Duration": 0.000,
"EscalationFlag": 0,
"ExpireTime": 0,
"FirstReported": "2006-01-02T15:04:05.999999999Z",
"GeoPath": {
"type": "LineString",
"coordinates": [[0,0],[0,0]]
},
"GeoLocation": {
"type": "Point",
"coordinates": [0,0]
},
"IPAddress": "",
"LastChanged": "2006-01-02T15:04:05.999999999Z",
"LastReported": "2006-01-02T15:04:05.999999999Z",
"Location": "",
"Method": "",
"Node": "",
"OrigSeverity": 1,
"OwnerName": "",
"RootCauseFlag": 0,
"RootCauseID": 0,
"Score": 0,
"Service": "",
"ServiceImpact": 0,
"Severity": "Unknown",
"SubDeviceType": "",
"SubMethod": "",
"SubNode": "",
"Summary": "",
"TicketFlag": 0,
"TicketID": "",
"ZoneID": 0
}
}
}
}