20 Creating Kafka Topics for ECE Integration
You create Apache Kafka topics so Oracle Communications Elastic Charging Engine (ECE) can support sending Notifications with HTTP Gateway, Diameter Gateway, RADIUS Gateway and BRM Gateway.
Topics in this document:
Setting Up ECE and Gateways for Apache Kafka Using Topic-Based Routing
Note:
This functionality requires ECE Interim Patch 15.1.0.0.1 (37951934).
You can use the enhanced Kafka integration in Oracle Communications Billing and Revenue Management (BRM) for improved real-time data processing, particularly in managing high data volumes. A dedicated message processor ensures efficient and reliable system functionality.
Note:
This is the recommended configuration for Kafka integration.
The message processor handles data in a controlled and sequential manner. Its checkpointing mechanism enables reliable recovery and uninterrupted data processing during system interruptions. If a gateway goes out of service, the system automatically redirects its partitions to another gateway without manual intervention. When the original gateway returns, traffic is automatically rerouted back.
Note:
For new multisite deployments, install and configure all sites at the same time, since there is no traffic in the system yet.
-
Install ECE Interim Patch 15.1.0.0.1 (37951934) or later on all sites.
-
Set the kafkaCustomPartitionAssignorEnabled ECE configuration MBean to true. See "Enabling the Topic-Based Kafka Architecture".
-
Wait until all ECS instances or pods have been restarted and any rebalancing has completed.
-
Perform a rolling restart of the Diameter Gateway, RADIUS Gateway, HTTP Gateway, EM Gateway, and BRM Gateway so that the consumers are restarted, and they subscribe to the new Kafka topics.
-
On multisite deployments, repeat steps 2 through 4 until all sites have been configured to use the Kafka topic-based routing feature.
-
On multisite deployments, restart the ReplicatedFederatedCache service between the sites in the Coherence cluster and verify that the ECE Configuration AppConfiguration MBeans in all the sites are in sync.
Enabling the Topic-Based Kafka Architecture
By default, ECE uses the Apache Kafka partition-based architecture to communicate with network clients, but you can configure ECE to use the topic-based architecture instead.
-
Do one of the following based on your system type:
-
Cloud native systems: Connect to a JMX editor, such as JConsole. See "JMX Connection to ECE Using JConsole" in BRM Cloud Native System Administrator’s Guide.
-
On-premises systems: Access the ECE configuration MBeans in a JMX editor, such as JConsole. See "Accessing ECE Configuration MBeans".
-
-
Expand the ECE Configuration node.
-
Expand charging.kafkaConfigurations.
-
Expand Attributes.
-
Set the kafkaCustomPartitionAssignorEnabled attribute to true.
Alternatively, you can use the charging-settings.xml file or your override-values.yaml file for oc-cn-ece-helm-chart to set the kafkaCustomPartitionAssignorEnabled attribute. If you make the change by updating the file, you must perform a rolling restart of your ECE application instances or pods for the change to take effect.
Generating Kafka Topics with Topic-Based Routing
You use the create_all_kafka_topics.sh script to create the required Kafka topics. When running the script, use the Kafka broker details for all sites.
To generate Kafka topics with topic-based routing:
-
Ensure you have installed the Apache Kafka command-line tools, such as kafka-topics.sh, and that their location is in the PATH environment variable or the KAFKA_HOME and KAFKA_BIN_DIR environment variables.
-
Do one of the following based on your system type:
-
Cloud native systems: Create an override-values.yaml file for oc-cn-ece-helm-chart and configure the keys under kafkaConfigurationList to meet your system requirements.
-
On-premises systems: In your charging-settings.xml file, configure the kafkaConfigurationList entries to meet your system requirements.
-
-
In your configuration file, configure the Kafka brokers to allow automatic topic creation. Set the default number of partitions for new topics to either 20 or the product of the number of Kafka broker instances and the number of gateways that will consume notifications for a topic, whichever is greater.
For example, if you have 3 Kafka brokers and 8 Diameter Gateways, set the default to 24 (3 x 8) partitions. This configuration ensures that there are more partitions than consumers, allowing each consumer to have its own partition and distributing partitions evenly across Kafka brokers for optimal resource utilization.
Note:
For multisite deployments, the configuration file should have the details for all sites. This ensures that the create_all_kafka_topics.sh script creates all necessary topics for all Kafka brokers across all sites, supporting automatic high availability and disaster recovery.
-
Run the create_all_kafka_topics.sh script:
create_all_kafka_topics.sh overrideValuesFile --bootstrap-server kafkaHost:port --partitions numPartitions [--ssl enabled | disabled] [--truststore-location trustStorePath] [--truststore-password trustStorePassword] [--replication-factor replicationNum]
where:
-
overrideValuesFile specifies the name and path to the override-values.yaml file you created in step 2.
-
kafkaHost and port specify the host and port that the Kafka client will connect to in a bootstrap Kafka cluster the first time it starts.
-
numPartitions specifies the number of partitions to create in the topic.
-
trustStorePath specifies the path to the TrustStore file. This option is required only if SSL is enabled.
-
trustStorePassword specifies the password for the TrustStore. This option is required only if SSL is enabled.
-
replicationNum specifies how many copies of each topic’s partition are stored across different brokers in the Kafka cluster. The default is 1, which disables the replication.
-
Migrating Existing ECE Systems Gateways to Apache Kafka Using Topic-Based Routing
-
For multisite deployments, stop the federation between the site about to be migrated and the other sites.
-
Install ECE Interim Patch 15.1.0.0.1 (37951934) or later on all sites.
-
Set the kafkaCustomPartitionAssignorEnabled ECE configuration MBean to true. See "Enabling the Topic-Based Kafka Architecture".
This updates the charging-settings.xml file in the cache and updates the ECE charging-settings-namespace ConfigMap.
-
During the ECE postinstallation process, run the create_all_kafka_topics.sh script to create the required Kafka topics. See "Generating Kafka Topics with Topic-Based Routing".
Wait until all ECS instances or pods have been restarted and any rebalancing has completed.
-
Drain the remaining notifications from the Kafka brokers at the site being migrated. Using the existing site's gateways, any remaining notifications in the old EceNotifications topics at the Kafka broker will be consumed.
Run this step until all notifications on the old topics are consumed. Wait until the old topics have been drained of all remaining notifications so that the consumers are restarted and subscribe to the new Kafka topics.
-
Perform a rolling restart of the Diameter Gateway, RADIUS Gateway, HTTP Gateway, EM Gateway, and BRM Gateway so that the consumers are restarted, and they subscribe to the new Kafka topics.
Note:
Ensure that BRM is pointed to another site’s EM Gateways prior to bringing down the EM Gateway at the site being migrated.
-
On multisite deployments, repeat 2 through 6 for all the sites until they have been migrated to using the new Kafka redesign feature.
-
Restart the configuration federation of the ReplicatedFederatedCache service between the sites in the Coherence cluster and verify that the ECE Configuration AppConfiguration MBeans in all the sites are in sync.
Note:
For ECS pods, run the helm upgrade with the job to update app configuration enabled.
Setting Up ECE and Gateways for Apache Kafka Using Partition-Based Routing
Note:
This configuration is recommended for backward compatibility only.
To set up ECE with your Kafka topics:
-
When you install ECE:
-
Specify to use Apache Kafka topics and enter the details for your ECE notification topics, Suspense topic, ECE failure topic, and ECE overage topic.
-
(HTTP Gateway Only) Enable Network Repository Function (NRF) registration in one of your HTTP Gateway servers.
-
(BRM Gateway Only) Specify the details for connecting to the BRM Gateway.
For more information, see "Installing Elastic Charging Engine" in ECE Installation Guide.
-
-
During the ECE postinstallation process:
-
Run the kafka_post_install.sh script to create your ECE notification topics, Suspense topic, ECE failure topic, and ECE overage topic. See "Creating Kafka Topics for ECE" in ECE Installation Guide.
-
Run the post_Install.sh script and choose to create only the Acknowledgment queue. See "Creating WebLogic JMS Queues for BRM" in ECE Installation Guide.
Note:
It is strongly recommended that you use the kafka_post_install.sh script to create all ECE notification topics in advance instead of relying on topic auto-creation. This ensures that ECE notification consumers will be able to subscribe to topics even before the first notification message is produced.
All ECE consumer groups are prefixed with Ece. Your consumers should never use a consumer group that starts with this prefix to avoid collisions.
-
-
Connect ECE to your Kafka Server topics. See "Connecting ECE and Gateways to Kafka Topics".
-
(Optional) Configure ECE to send information about failed usage requests to the ECE failure topic. See "Recording Failed ECE Usage Requests".
-
(Optional) Configure ECE to generate CDRs for any prepaid usage overage and send them to the ECE overage topic. See "Configuring ECE to Support Prepaid Usage Overage".
Connecting ECE and Gateways to Kafka Topics
You connect ECE to the following Kafka topics so that ECE can publish notifications, failed usage requests, and CDRs with usage overage information to them.
-
ECE notification topics: Stores notifications from ECE.
-
Suspense topic: Stores failed notifications from ECE.
-
ECE failure topic: (HTTP Gateway and Diameter Gateway only) Stores details about failed ECE usage requests, such as the user ID and request payload.
-
ECE overage topic: (HTTP Gateway and Diameter Gateway only) Stores overage records, which contain details about usage overage amounts for prepaid customers.
You can also connect the HTTP Gateway, Diameter Gateway, and RADIUS Gateway to these Kafka topics so they can retrieve notifications, failed usage requests, and CDRs with usage overage information from them.
Note:
If you are using an ECE system with a topic-based Kafka architecture, ECE splits the ECE notifications topic into multiple granular topics based on factors such as cluster, gateway, notification type, and client.
ECE creates each notification topic using this naming convention:
Notify.clusterName.consumerGroup[.notificationName][.networkClientId][.eceGatewayId]
where:
-
clusterName is the string that represents the site name from your charging-settings.xml file. The allowable values are BRM1, BRM2, ECE1, ECE2, and so on.
-
consumerGroup is the name of the consumer group. The possible values are DiamNetwork, RadNetwork, ChfNetwork, Brm, Ext, Suspense, and Overage.
-
notificationName is the notification event type. This is included for DiamNetwork, RadNetwork, ChfNetwork, Brm, and Ext consumer groups.
-
networkClientId is used to differentiate the CHF Network (for the HTTP Gateway) from the Diameter Client (for the Diameter Gateway). This is included for DiamNetwork and ChfNetwork consumer groups.
-
eceGatewayId is the value taken from the Diameter Gateway ID. This is included for DiamNetwork consumer groups.
To connect ECE and gateways to your Kafka topics:
-
Access the ECE configuration MBeans in a JMX editor, such as JConsole. See "Accessing ECE Configuration MBeans".
-
Expand the ECE Configuration node.
-
Expand charging.kafkaConfigurations.
-
Expand Attributes.
-
Specify the Kafka configuration values for the attributes in Table 20-1.
Table 20-1 kafkaConfiguration Properties
Property Name Description name
The name of your ECE cluster.
hostname
The host name and port number of the machine in which Apache Kafka is installed.
If it contains multiple Kafka brokers, create a comma-separated list.
topicName
The name of the Kafka topic where ECE will publish notifications.
Note: This is for partition-based routing only.
suspenseTopicName
The name of the Kafka topic where failed notifications are published.
Note: This is for partition-based routing only.
failureTopicName
The name of the Kafka topic where ECE will publish details about failed usage requests.
Note: This is for partition-based routing only.
overageTopicName
The name of the Kafka topic where ECE will publish overage records with information about your prepaid customer's usage overage during online sessions.
Note: This is for partition-based routing only.
partitions
The number of Kafka partitions in your topics ECE notification topic.
The recommended number to create is calculated as follows:
[(Max HTTP Gateway Nodes) + (Max Diameter Gateway Nodes * Max Diameter Clients) + (1 for BRM Gateway) + (1 for Internal Notifications)]
For example, if you have 2 HTTP Gateway nodes, 4 Diameter Gateway nodes, 10 Diameter Gateway clients, and a BRM Gateway, you would need [(2 + (4 * 10) + 1 + 1) = 44 Kafka partitions.
Arbitrarily, you can set this to a maximum value.
Note: This is for partition-based routing only.
externalPartitions
The total number of Kafka partitions in the ECE External Notification topic.
Note: This is for partition-based routing only.
externalTopicEnabled
The parameter that specifies, whether ECE External Notification topic is created and in use. Set to true to enable.
externalTopicName
The name of the ECE External Notification topic.
Note: This is for partition-based routing only.
failurePartitions
The number of Kafka partitions in your ECE failure topic.
Note: This is for partition-based routing only.
kafkaCustomPartitionAssignorEnabled
(Requires ECE Interim Patch 15.1.0.0.1 (37951934))
The parameter to enable topic-based routing for Kafka notifications which uses a custom partition assignor for automated support for high availability and disaster recovery.
If this parameter is set to false, the partition-based routing will be used.
kafkaProducerReconnectionInterval
The amount of time, in milliseconds, the ECE Notification Publisher waits before attempting to reconnect to a Kafka topic.
kafkaProducerReconnectionMax
The maximum amount of time, in milliseconds, the ECE Notification Publisher waits before attempting to reconnect to a broker that has repeatedly failed to connect.
The kafkaProducerReconnectionInterval will increase exponentially for each consecutive connection failure, up to this maximum.
kafkaHTTPReconnectionInterval
The amount of time, in milliseconds, HTTP Gateway waits before attempting to reconnect to the Kafka topic.
kafkaHTTPReconnectionMax
The maximum amount of time, in milliseconds, HTTP Gateway waits before attempting to reconnect to a broker that has repeatedly failed to connect.
The kafkaHTTPReconnectionInterval will increase exponentially for each consecutive connection failure, up to this maximum.
kafkaDGWReconnectionInterval
The amount of time, in milliseconds, Diameter Gateway waits before attempting to reconnect to the Kafka topic.
kafkaDGWReconnectionMax
The maximum amount of time, in milliseconds, Diameter Gateway waits before attempting to reconnect to a broker that has repeatedly failed to connect.
The kafkaDGWReconnectionInterval will increase exponentially for each consecutive connection failure, up to this maximum.
kafkaBRMReconnectionInterval
The amount of time, in milliseconds, BRM Gateway waits before attempting to reconnect to the Kafka topic.
kafkaBRMReconnectionMax
The maximum amount of time, in milliseconds, BRM Gateway waits before attempting to reconnect to a broker that has repeatedly failed to connect.
The kafkaBRMReconnectionInterval will increase exponentially for each consecutive connection failure, up to this maximum.
kafkaFederationIssuesTopicName
The Kafka topic name for federation-related issues.
kafkaFederationIssuesReconnectionInterval
The reconnection interval, in milliseconds, of the Kafka producer to the federation issues topic.
kafkaFederationIssuesReconnectionMax
The maximum reconnection time, in milliseconds, of the Kafka producer to the federation issues topic
kafkaRGWTopicName
The Kafka topic that is used for Radius Gateway notifications.
kafkaRGWReconnectionIntervalMillis
The amount of time, in milliseconds, RADIUS Gateway waits before attempting to reconnect to the Kafka topic.
kafkaRGWReconnectionMaxMillis
The maximum amount of time, in milliseconds, RADIUS Gateway waits before attempting to reconnect to a broker that has repeatedly failed to connect.
The kafkaRGWReconnectionIntervalMillis will increase exponentially for each consecutive connection failure up to this maximum.
walletLocation
The path of the wallet where certificates are stored when connecting to the Kafka brokers using SSL.
Recording Failed ECE Usage Requests
ECE may occasionally fail to process usage requests. For example, a data usage request could fail because a customer has insufficient funds. You can configure ECE to publish details about failed usage requests, such as the user ID and request payload, to the ECE failure topic in your Kafka server. Later on, you can reprocess the usage requests or view the failure details for analysis and reporting.
To enable the recording of failed ECE usage requests:
-
Access the ECE configuration MBeans in a JMX editor, such as JConsole.
-
Expand the ECE Configuration node.
-
Expand charging.kafkaConfigurations.
-
Expand Attributes.
-
Set the persistFailedRequestsToKafkaTopic property to true.
Note:
You can also examine the gateway log files to determine why a usage request failed. See "Troubleshooting Failed Usage Requests" in BRM System Administrator's Guide for more information.
Configuring ECE to Support Prepaid Usage Overage
You can configure ECE to capture any overage amounts by prepaid customers during an active session, which can help you prevent revenue leakage. If the network reports that the number of used units during a session is greater than a customer's available allowance and credit limit, ECE charges the customer up to the available allowance. It then creates an overage record with information about the overage amount and sends it to the ECE Overage topic. You can create a custom solution for reprocessing the overage amount later on.
Note:
If your system does not contain a Kafka server, you can configure ECE to publish overage usage amounts to a log file instead. See "Logging Prepaid Usage Overage".
For example, assume a customer has a prepaid balance of 100 minutes, but uses 130 minutes during a session. ECE would charge the customer for 100 minutes, create an overage record for the remaining 30 minutes of usage, and write the overage record to the ECE Overage topic.
To configure ECE to support prepaid usage overage, do the following:
-
Ensure that you created an ECE Overage topic and connected ECE to your Kafka Server. See "Connecting ECE and Gateways to Kafka Topics".
-
Enable ECE to check for and capture any usage overage:
-
Access the ECE configuration MBeans in a JMX editor, such as JConsole. See "Accessing ECE Configuration MBeans".
-
Expand the ECE Configuration node.
-
Expand charging.server.
-
Expand Attributes.
-
Set the checkReservationOverImpact attribute to true. (The default is false.)
-
You can also customize ECE to include any existing unrated quantity in a CDR or notification using the ECE SDK post charging extension (postCharagingExtension). This can be done using the OverageDetailsImpl class. The unrated quantity can be then captured by the getOverageDetailsMap() method that is available in the extension.