Kafka Bridge

The Kafka Bridge microservice takes messages embedded with JSON, XML, or plaintext from a configured input (Pulsar or Kafka) and enqueues them to the configured output (Pulsar, Kafka, or Standard Out).

Note:

This microservice was previously called Kafka Adapter (kafka-adapter in the Helm chart and commands). It is now Kafka Bridge (kafka-bridge).

This microservice is part of the Event microservice pipeline. See Understanding the Event Pipeline in Unified Assurance Concepts for conceptual information. It can also participate in other pipelines that need to connect to Kafka.

You can enable redundancy for this microservice when you deploy it. See Configuring Microservice Redundancy for general information.

You can enable autoscaling for this microservice when you deploy it when you are using an internal Pulsar topic as input. See Configuring Autoscaling and Kafka Bridge Autoscaling Configuration.

This microservice provides additional Prometheus monitoring metrics. See Kafka Bridge Self-Monitoring Metrics.

Kafka Bridge Prerequisites

Before deploying the microservice, confirm that the following prerequisites are met:

  1. A microservice cluster is set up. See Microservice Cluster Setup.

  2. The Apache Pulsar microservice is deployed. See Pulsar.

  3. You know the Kafka or Pulsar topic to send messages to or get them from.

  4. You have created the appropriate Kubernetes secrets for your environment, which you will use when deploying the microservice:

    • To connect to the internal Unified Assurance Kafka broker when using basic authentication, create a secret containing the internal Kafka username and password:

      a1k create secret generic <microservice-release-name>-secret --from-literal=username=<username> --from-literal=password=<password> -n <namespace>
      
    • To connect to the internal Unified Assurance Kafka broker when using OAuth, create a secret containing the internal OAuth client ID and secret:

      a1k create secret generic <microservice-release-name>-secret --from-literal=clientId=<clientID> --from-literal=clientSecret=<secret> -n <namespace>
      
    • To connect to an external Kafka server:

      1. Place the certificate file from your external Kafka server in the $A1BASEDIR/etc/ssl directory.

      2. Create secrets for the internal OAuth client ID and secret and the external Kafka certificate:

        a1k create secret generic <microservice-release-name>-secret --from-literal=clientId=<clientID> --from-literal=clientSecret=<secret> -n <namespace>
        a1k create secret generic <microservice-release-name>-certs  --from-file=$A1BASEDIR/etc/ssl/<KafkaCert>.crt -n <namespace>
        
    • To connect to an external Kafka server that is using SSL as well as OAuth:

      1. Place the certificate files from your external Kafka and OAuth servers in the $A1BASEDIR/etc/ssl directory.

      2. Create secrets for the internal OAuth client ID and secret, the external Kafka certificate, and the external OAuth certificate:

        a1k create secret generic <microservice-release-name>-secret --from-literal=clientId=<clientID> --from-literal=clientSecret=<secret> -n <namespace>
        a1k create secret generic <microservice-release-name>-certs  --from-file=$A1BASEDIR/etc/ssl/<KafkaCert>.crt --from-file=/opt/assure1/etc/ssl/<OAuthcert>.crt -n <namespace>
        

    In the commands:

    • <namespace> is the namespace where you will deploy the microservice. The default namespace is a1-zone1-pri.

    • <microservice-release-name> is the name you will use for the microservice instance. Oracle recommends using the microservice name (kafka-bridge) unless you are deploying multiple instances of the microservice to the same cluster.

Deploying Kafka Bridge

To deploy the microservice, run the following commands:

su - assure1
export NAMESPACE=<namespace>
export WEBFQDN=<WebFQDN> 
a1helm install <microservice-release-name> assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM=<SASL_mechanism> <Kafka_authentication_details>

In the commands:

You can also use the Unified Assurance UI to deploy microservices. See Deploying a Microservice by Using the UI for more information.

Kafka Bridge Authentication Configuration Parameters

The configuration parameters that you must set for the Kafka Bridge microservice depend on the SASL mechanism used by your Kafka server, whether you are connecting to the internal Unified Assurance Kafka broker or an external Kafka server, and whether the Kafka Server is using SSL.

The different combinations are described in the following table.

SASL Mechanism Kafka Server Type SSL Enabled Required Parameters Deployment Command
PLAIN Internal N/A N/A a1helm install kafka-bridge assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM="PLAIN"
OAUTH Internal N/A KAFKA_SASL_OAUTH_ENDPOINT_URI a1helm install kafka-bridge assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM="OAUTH" --set KAFKA_SASL_OAUTH_ENDPOINT_URI=<token_URL>
Where <token_URL> is the OAuth token endpoint for the Kafka broker.
OAUTH External N KAFKA_SASL_OAUTH_ENDPOINT_URI
KAFKA_TLS_CA
a1helm install kafka-bridge assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM="OAUTH" --set KAFKA_SASL_OAUTH_ENDPOINT_URI=<token_URL> --set KAFKA_TLS_CA="<KafkaCert>.crt"
Where <KafkaCert> is the name of your external Kafka server's certificate file.
OAUTH External Y KAFKA_SASL_OAUTH_ENDPOINT_URI
KAFKA_TLS_CA
KAFKA_SASL_OAUTH_SERVER_TLS_CA
a1helm install kafka-bridge assure1/kafka-bridge -n $NAMESPACE --set global.imageRegistry=$WEBFQDN --set KAFKA_SASL_MECHANISM="OAUTH" --set KAFKA_SASL_OAUTH_ENDPOINT_URI=<token_URL> --set KAFKA_TLS_CA="<KafkaCert>.crt" --set KAFKA_SASL_OAUTH_SERVER_TLS_CA="<OAuthcert>.crt"
Where <OAuthCert> is the name of your external OAuth server's certificate file.

Changing Kafka Bridge Configuration Parameters

When running the install command, you can optionally change default configuration parameter values by including them in the command with additional --set arguments. You can add as many additional --set arguments as you need.

For example:

Default Kafka Bridge Configuration

The following table describes the default configuration parameters found in the Helm chart under configData for the microservice.

Name Default Value Possible Values Notes
LOG_LEVEL INFO FATAL, ERROR, WARN, INFO, DEBUG The logging level for the microservice.
STREAM_INPUT "" Text, 255 characters The Pulsar or Kafka topic that the microservice subscribes to.
For Kafka topics, include tlsConfigPrefix=KAFKA_TLS and, if your Kafka server uses SSL, saslConfigPrefix=KAFKA_SASL. See Example Input and Output Stream Formats for examples.
STREAM_OUTPUT "" Text, 255 characters The Pulsar or Kafka topic that the microservice publishes to.
For Kafka topics, use tlsConfigPrefix=KAFKA_TLS and, if your Kafka server uses SSL, saslConfigPrefix=KAFKA_SASL. See Example Input and Output Stream Formats for examples.
REDUNDANCY_POLL_PERIOD 5 Integer greater than 0 The number of seconds between status checks from the secondary microservice to the primary microservice.
REDUNDANCY_FAILOVER_THRESHOLD 4 Integer greater than 0 The number of times the primary microservice must fail checks before the secondary microservice becomes active.
REDUNDANCY_FALLBACK_THRESHOLD 1 Integer greater than 0 The number of times the primary microservice must succeed checks before the secondary microservice becomes inactive.
KAFKA_SASL_MECHANISM "" OAUTH or PLAIN The Simple Authentication and Security Layer (SASL) mechanism to use to connect with the Kafka server.
KAFKA_SASL_OAUTH_ENDPOINT_URI "" Text The OAuth server token URL for Kafka.
KAFKA_TLS_CA "" Text The certificate to use for external Kafka authentication. Only required when connecting to an external Kafka topic.
KAFKA_SASL_OAUTH_SERVER_TLS_CA "" Text The certificate to use for the external Kafka OAuth server authentication. Only required when the OAuth server uses SSL.

Example Input and Output Stream Formats

You can specify Pulsar or Kafka topics in the STREAM_INPUT and OUTPUT_STREAM configuration parameters.

When you are communicating with a Kafka server that uses authentication, you must also include query parameters for the appropriate tlsConfigPrefix and saslConfigPrefix setting in the URL. Include tlsConfigPrefix when the server uses SSL and include saslConfigPrefix when the server uses OAuth or Basic authentication.

For example:

For Kafka, you can also include multiple topics in the URL and include the following query parameters:

For example, to use some of these query parameters, set the stream to:

kafka://<host:port>/topic1,topic2?version=2.1.1&tlsConfigPrefix=KAFKA_TLS&offset=newest&consumerGroup=grp1&maxMessageSize=1024

Kafka Bridge Autoscaling Configuration

Autoscaling is supported for the Kafka Bridge microservice when you are using an internal Pulsar topic as the input. See Configuring Autoscaling for general information and details about the standard autoscaling configurations.

The Kafka Bridge microservice also supports the additional configurations described in the following table.

Name Default Value Possible Values Notes
thresholds.backlogSize 1000 Integer The number of items that need to be in the backlog before the autoscaling starts additional processes.
thresholds.totalEventsProcessed 400 Integer Total events processed by the microservice. If the average of total events processed in the configured pollingInterval exceeds the threshold, pods will be scaled.

Kafka Bridge Self-Monitoring Metrics

The Kafka Bridge microservice exposes the self-monitoring metrics described in the following table to Prometheus.

Metric Name Type Description
received_events_total Counter Number of events received.
sent_events_total Counter Number of events sent.
avg_time Gauge The average time to process events.
reception_success_per_second Gauge The rate of successfully received messages per second.
sending_success_per_second Gauge The rate of successfully sent messages per second.
failed_events_total Counter The total number of failed events.
outstanding_metric Gauge The number of pending events.
starts_per_second Gauge The rate of messages started per second.
fails_per_second Gauge The rate of messages failed per second.
finished_per_second Gauge The rate of messages finished per second.

Note:

Metric names in the database include a prefix that indicates the service that inserted them. The prefix is prom_ for metrics inserted by Prometheus. For example, received_events_total is stored as prom_received_events_total in the database.

Example Kafka Messages

Example text message:

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.example.com:9092,broker2.example.com:9092/kafka-topic"
    }
  ],
  "_domain": "...",
  "_type": "...",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "text",
    "text": "A plaintext message"
  }
}

Example XML message:

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.something.com:9092,broker2.something.com:9092/kafka-topic"
    }
  ],
  "_domain": "...",
  "_type": "...",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "xml",
    "xml": "<?xml version=\"1.0\" encoding=\"UTF-8\"?><note><to>Tove</to><from>Jani</from><heading>Reminder</heading><body>Don't forget me this weekend!</body></note>"
  }
}

Example JSON message:

{
  "_agents": [
    {
      "@timestamp": "2020-03-18T18:41:00.000Z",
      "host": "assure1host.pod.dns.com",
      "id": "assure1host.pod.dns.com",
      "app": "app-name",
      "version": "1.0.0-1",
      "input": "persistent://assure1/kafka/sink",
      "output": "kafka://broker1.example.com:9092,broker2.example.com:9092/kafka-topic"
    }
  ],
  "_domain": "fault",
  "_type": "event",
  "_version": "5.0.0",
  "@timestamp": "2020-03-18T18:41:00.000Z",
  "message": {
    "_type": "json",
    "json": {
      "event": {
        "EventID": 0,
        "EventKey": "",
        "EventCategory": 3,
        "EventType": "",
        "Ack": 0,
        "Action": "",
        "Actor": "",
        "Count": 1,
        "Customer": "",
        "Department": "",
        "Details": {},
        "DeviceType": "",
        "Duration": 0.000,
        "EscalationFlag": 0,
        "ExpireTime": 0,
        "FirstReported": "2006-01-02T15:04:05.999999999Z",
        "GeoPath": {
          "type": "LineString",
          "coordinates": [[0,0],[0,0]]
        },
        "GeoLocation": {
          "type": "Point",
          "coordinates": [0,0]
        },
        "IPAddress": "",
        "LastChanged": "2006-01-02T15:04:05.999999999Z",
        "LastReported": "2006-01-02T15:04:05.999999999Z",
        "Location": "",
        "Method": "",
        "Node": "",
        "OrigSeverity": 1,
        "OwnerName": "",
        "RootCauseFlag": 0,
        "RootCauseID": 0,
        "Score": 0,
        "Service": "",
        "ServiceImpact": 0,
        "Severity": "Unknown",
        "SubDeviceType": "",
        "SubMethod": "",
        "SubNode": "",
        "Summary": "",
        "TicketFlag": 0,
        "TicketID": "",
        "ZoneID": 0
      }
    }
  }
}