116 DBMS_KAFKA_ADM
The DBMS_KAFKA_ADM
package provides a PL/SQL interface to
create cluster definitions, which you can then use to grant access to Kafka cluster
data for applications.
Administrators granted the OSAK_ADMIN_ROLE
can use
DBMS_KAFKA_ADM
package to create applications that
query Kafka data from Oracle Database views and tables.
DBMS_KAFKA_ADM Overview
The DBMS_KAFKA_ADM
package enables you to manage the Oracle
SQL access to a Kafka cluster.
To set up access to Kafka topics, you require the privileges granted with
OSAK_ADMIN_ROLE
. You use these privileges to provide a cluster definition
that helps you to manage resources when connecting to Kafka clusters. The ability of users to
share a cluster definition enables the underlying framework to share connections to the
clusters. Sharing connections conserves memory and network resources.
DBMS_KAFKA_ADM Security Model
Oracle recommends that you grant the OSAK_ADMIN_ROLE
in
the target Oracle Database to an administrator user for Oracle SQL Access to
Kafka.
Using the OSAK_ADMIN_ROLE
simplifies the process of
granting system privileges required to create and administer Oracle SQL access to Kafka
applications. Also, administrators with the OSAK_ADMIN_ROLE
grant
access for other users to the Oracle Database object that has the relevant Kafka view
and table for which they need access.
In addition to the OSAK_ADMIN_ROLE
, an administrator for
Oracle SQL access to Kafka applications must have the following system privileges:
CREATE SESSION
ALTER SESSION
CREATE CREDENTIAL,
to create a Kafka SASL-SSL (Simple Authentication and Security Layer) password or OSS (Oracle Streaming Service) authTokenCREATE ANY DIRECTORY
, to create cluster access and cluster configuration directoryDROP ANY DIRECTORY
, to drop cluster access and cluster configuration directoryREAD ON
privileges tosys.dbms_kafka_clusters
READ ON
privileges tosys.dbms_kafka_applications
READ ON
privileges tosys.dbms_kafka_messages
EXECUTE ON
privileges tosys.dbms_kafka_adm
DBMS_KAFKA_ADM Constants
DBMS_KAFKA_ADM
package is a stateful package
that uses the constants described here as part of the declared package state in the
package specification.
Kafka Providers
The DBMS_KAFKA_ADM
constants include constants
with literal initial values and constants that you specify in your package
build.
KAFKA_PROVIDER_APACHE CONSTANT VARCHAR2(6) := 'APACHE';
KAFKA_PROVIDER_OSS CONSTANT VARCHAR2(3) := 'OSS';
States of a cluster definition
STATE_CONNECTED CONSTANT INTEGER := 0;
STATE_MAINTENANCE CONSTANT INTEGER := 1;
STATE_BROKEN CONSTANT INTEGER := 2;
STATE_DEREGISTERED CONSTANT INTEGER := 3;
Kafka connection modes
Kafka connection modes specify one of the following connection mode constants:
-
High Throughput
If the connection mode (
connmode
) ishigh_throughput
, then the applications associated with this connection require all of the data to be delivered as fast as possible. This connection mode is optimal for use from STREAMING and LOADING applications.For example:
CONNECTION_MODE_HI_THRU CONSTANT VARCHAR2(15) := ‘high_throughput’;
-
Low Latency
If the connection mode (
connmode
) islow_latency
, then the underlying layer attempts to return the first rows as fast as possible. This connection mode is most useful for applications that use only a small amount of Kafka data for each load.For example:
CONNECTION_MODE_LO_LAT CONSTANT VARCHAR2(11) := ‘low_latency’;
-
Options
OPT_CONNECTION_MODE CONSTANT VARCHAR2(30) := ‘connmode’;
Summary of DBMS_KAFKA_ADM Procedures
DBMS_KAFKA_ADM
procedures enable you to create, delete,
and configure Oracle SQL access to Kafka.
Kafka providers
Kafka provider constants are used to define the Kafka cluster. Because the security models are different between an Apache Kafka-based cluster and an Oracle Streaming Service-based cluster, you must define to which type of Kafka environment you are connecting, so that the correct secure connection parameters can be passed.For example:
KAFKA_PROVIDER_APACHE CONSTANT VARCHAR2(6) := 'APACHE';
KAFKA_PROVIDER_OSS CONSTANT VARCHAR2(3) := 'OSS';
States of a cluster definition
STATE_CONNECTED CONSTANT INTEGER := 0;
STATE_MAINTENANCE CONSTANT INTEGER := 1;
STATE_BROKEN CONSTANT INTEGER := 2;
STATE_DEREGISTERED CONSTANT INTEGER := 3;
Kafka connection modes
Kafka connection modes specify the Kafka rules that Oracle SQL access to Kafka follows to read a batch of records. Depending on the requirements for your application, you can specify either a low latency rule or a high throughput rule, depending on which option is the more efficient use of resources. You should consider tradeoffs between throughput and latency when you create your connections. The latency between the time that a record is generated and the time that you want that data made available to your application should determine the whether you make low latency your priority, or high throughput your priority.
If your application requires data access in near real-time, so that your application can react to data within seconds or a minute of its being generated by the Kafka cluster, then you should choose low latency connections. With a low latency connection, the Oracle SQL Kafka connection buffers are set to a smaller size, and the reads to the Kafka topic are made frequently, so that the delay between the time a record is generated to the Kafka topic and consumed by the application using Oracle SQL access to Kafka is within the performance requirements of your application. Each read from the Kafka topic is expected to consist of a relatively few number of records, and the number of reads from the Kafka cluster are high. This form of Kafka connection is particularly suited to updating an application with near real-time concurrent transactions, or where the application must monitor and react to changes quickly. However, this configuration can be resource-intensive, as the number of transactions are relatively high for the amount of data received in each transaction.
If your application does not have near real-time requirements, so that your application can delay updates in favor of processing a large batch of records, in a cadence of 15 minutes, or an hour, or every 8 hours, then a high throughput connection can be a more efficient choice. With a connection configured for high throughput, each batch of Kafka data is streamed more infrequently, but the amount of data streamed in each transaction is larger. Accordingly, the Oracle SQL access to Kafka buffer size is larger, and the network I/O and memory allocations are configured for these larger batches. Because these transactions are streamed less frequently, the overall amount of server resources consumed by the Oracle SQL access to Kafka transaction is less
-
Low Latency
CONNECTION_MODE_LO_LAT CONSTANT VARCHAR2(11) := ‘low_latency’;
-
High Throughput
CONNECTION_MODE_HI_THRU CONSTANT VARCHAR2(15) := ‘high_throughput’;
-
Options
OPT_CONNECTION_MODE CONSTANT VARCHAR2(30) := ‘connmode’;
This table lists the DBMS_KAFKA_ADM procedures, and briefly describes them.
Table 116-1 DBMS_KAFKA_ADM Package Procedures
Subprogram | Description |
---|---|
Tests the Kafka cluster connectivity. |
|
Deregisters the Kafka cluster. |
|
Disables the Kafka cluster. |
|
Enables the Kafka cluster. |
|
Registers the Kafka cluster. |
|
Updates the host server, or Kafka cluster options, or both. |
CHECK_CLUSTER
DBMS_KAFKA_ADM
procedure
CHECK_CLUSTER
tests the Kafka cluster connectivity.
Syntax
Use this procedure to confirm that a connection can be established with the configured security information. The function returns the state of the cluster.
FUNCTION CHECK_CLUSTER(
cluster_name IN VARCHAR2
) RETURN INTEGER;
Parameters
Table 116-2 CHECK_CLUSTER procedure parameters for DBMS_KAFKA_ADM
Parameter | Description |
---|---|
|
Name of an existing Kafka cluster Case-insensitive. |
Usage Notes
As the Oracle SQL access to Kafka cluster administrator, you should
check the connectivity of the cluster. Checking the connectivity ensures that the
bootstrap server list and the security related configuration that you have provided
for the Kafka cluster is proper, and that a connection can be successfully
established. You can use this function to test the connection when you run
REGISTER_CLUSTER
, and the security configuration has been
established. After the cluster connection has been verified, you can make the
cluster definition available to be used by user applications.
You also use this function if users have reported issues when calling
DBMS_KAFKA.LOAD_TEMP_TABLE
and
DBMS_KAFKA.EXECUTE_LOAD_APP
, both of which select from Oracle
SQL access to Kafka views.
Examples
Suppose you have completed your initial cluster definition for the cluster ExampleCluster, and registered the cluster. Next, you use this procedure to check the configuration:
EXEC SYS.DBMS_OUTPUT.PUT_LINE (
SYS.DBMS_KAFKA_ADM.CHECK_CLUSTER (‘ExampleCluster’));
DEREGISTER_CLUSTER
DBMS_KAFKA_ADM
procedure
DEREGISTER_CLUSTER
deregisters the Kafka cluster.
Note:
This procedure will only deregister and remove the cluster if there are no user applications associated with the clusterSyntax
PROCEDURE DEREGISTER_CLUSTER (
cluster_name IN VARCHAR2,
forced IN BOOLEAN DEFAULT FALSE
);
Parameters
Table 116-3 DEREGISTER_CLUSTER Procedure Parameters for DBMS_KAFKA_ADM
Parameter | Description |
---|---|
|
Name of an existing Kafka cluster Case-insensitive. |
forced |
(Optional) Marks the cluster as deregistered even if there are
user applications still defined. Default:
Values: |
Usage Notes
When a cluster definition is no longer needed, the OSAK Administrator can remove the cluster definition.
If the forced
parameter is passed as TRUE
, then the
cluster's state is marked as deregistered. Applications that are still associated
with this cluster will no longer be functional. When the last application is
dropped, this deregistered cluster definition will also be removed.
Examples
In the following example, an unused cluster definition for the Kafka cluster ExampleCluster is deregistered:
EXEC SYS.DBMS_KAFKA_ADM.DEREGISTER_CLUSTER (‘ExampleCluster’);
Suppose that there are still user applications (views and tables) associated with the cluster ExampleCluster, but you want to deregister the cluster. In the following example, the cluster ExampleCluster is deregistered with the forced parameter set to TRUE:
exec DBMS_KAFKA_ADM.DEREGISTER_CLUSTER(
cluster_name => 'ExampleCluster',
forced => TRUE);
DISABLE_CLUSTER
DBMS_KAFKA_ADM
procedure
DISABLE_CLUSTER
disables the Kafka cluster.
Note:
When this procedure completes successfully, the state of the application changes toMAINTENANCE
.
Syntax
PROCEDURE DISABLE_CLUSTER(
cluster_name IN VARCHAR2
);
Parameters
Table 116-4 DISABLE_CLUSTER Procedure Parameters for DBMS_KAFKA_ADM
Parameter | Description |
---|---|
|
Name of an existing Kafka cluster Case-insensitive. |
Usage Notes
A disabled cluster will prevent the cluster security information from being retrieved, thus preventing Kafka connections from being created or accessed.
Before you begin maintenance on a Kafka cluster, an Oracle SQL access to Kafka
administrator should disable access to the cluster by using the
DBMS_KAFKA_ADM.DISABLE_CLUSTER
procedure. After the maintenance
is completed, you can reeanble the cluster access by using the
DBMS_KAFKA_ADM.ENABLE_CLUSTER
procedure.
This procedure raises an exception if the state cannot be changed to
MAINTENANCE
Examples
In the following example, the cluster definition for ExampleCluster is temporarily disabled. In this state, views cannot connect to or retrieve data from the Kafka cluster:
EXEC SYS.DBMS_KAFKA_ADM.DISABLE_CLUSTER (‘ExampleCluster’);
ENABLE_CLUSTER
DBMS_KAFKA_ADM
procedure
ENABLE_CLUSTER
enables the Kafka cluster.
Note:
When this procedure completes successfully, the state of the application changes toCONNECTED
.
Syntax
PROCEDURE ENABLE_CLUSTER(
cluster_name IN VARCHAR2
);
Parameters
Table 116-5 ENABLE_CLUSTER Procedure Parameters for DBMS_KAFKA_ADM
Parameter | Description |
---|---|
|
Name of an existing Kafka cluster Case-insensitive. |
Usage Notes
After you complete maintenance on a Kafka cluster, an Oracle SQL access
to Kafka administrator should enable access to the cluster by using the
DBMS_KAFKA_ADM.ENABLE_CLUSTER
procedure.
This procedure raises an exception if the state cannot be changed to
CONNECTED
.
Examples
Suppose that you have disabled the cluster ExampleCluster to perform maintenance, and you now want to reenable the ability of views to connect to and retrieve data from that Kafka cluster. Enter the following command:
EXEC SYS.DBMS_KAFKA_ADM.ENABLE_CLUSTER (‘ExampleCluster’);
REGISTER_CLUSTER
DBMS_KAFKA_ADM
procedure
REGISTER_CLUSTER
registers the Kafka cluster and makes it available
for access on Oracle Database.
Syntax
FUNCTION REGISTER_CLUSTER (
cluster_name IN VARCHAR2,
startup_servers IN VARCHAR2,
kafka_provider_provider IN VARCHAR2,
cluster_access_dir IN VARCHAR2,
credential_name IN VARCHAR2 DEFAULT NULL,
cluster_config_dir IN VARCHAR2 DEFAULT NULL,
cluster_description IN VARCHAR2 DEFAULT NULL,
options IN CLOB DEFAULT NULL
) RETURN INTEGER;
Parameters
Table 116-6 REGISTER_CLUSTER Procedure Parameters for DBMS_KAFKA_ADM
Parameter | Description |
---|---|
|
Name of an existing Kafka cluster Case-insensitive |
|
The startup servers of the Kafka cluster Case-sensitive |
|
The provider of the Kafka server, which can be
either Apache or Oracle Cloud Infrastructure (OCI) Oracle
Streaming Service (OSS).. Syntax:
Values for Case-insensitive See also the |
|
The Oracle directory object for determining access to this cluster Case-insensitive |
|
Credential name associated with the password to connect to Kafka.
See |
|
The Oracle directory object containing the cluster configuration
files required for secure clusters. The value is
Case-insensitive |
|
(Optional) A text description of the cluster. Maximum length: 400 characters. |
|
(Optional) Cluster options. The format is a JSON-formatted
document. For possible options, see
|
Usage Notes
REGISTER_CLUSTER
automatically runs a
CHECK_CLUSTER
call after the cluster definition has been
stored. The status returned from the CHECK_CLUSTER
call is the
returned value from the REGISTER_CLUSTER
. This procedure raises an
exception if the state cannot be determined.
At the end of the lifecycle for an Oracle SQL access to Kafka (OSAK) cluster, the
administrator can remove the OSAK cluster by using the
DBMS_KAFKA_ADM.DEREGISTER_CLUSTER
function.
Examples
In the following example, the function request with
REGISTER_CLUSTER
registers the Kafka cluster KAFKACLUS1 that is
on the server mykafkastartup
, using Apache as the provider. The
cluster access directory on the Oracle Database instance is
OSAK_KAFKACLUS1_ACCESS
, accessed with the credential
KAFKACLUS1CRED1
, using the cluster configuration directory
OSAK_KAFKACLUS1_CONFIG
. The description for this cluster (which
is optional), is "My test cluster kafkaclus1." A user
(examplekafka-user
) is granted READ
access to
the Kafka cluster data on OSAK_KAFKACLUS1_ACCESS
,
SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER (‘KAFKACLUS1’,
‘mykafkastartup-host:9092’,
DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE,
‘OSAK_KAFKACLUS1_ACCESS’
‘KAFKACLUS1CRED1’,
‘OSAK_KAFKACLUS1_CONFIG’,
'My test cluster kafkaclus1’) from dual;
Output: Successful registration return 0 (zero)
SQL> DBMS_KAFKA_ADM_RE...
0
SQL> grant read on directory OSAK_KAFKACLUS1_ACCESS to examplekafka-user;
You can also create non-secured cluster access. In the following example, a non-secure Kafka cluster access is registered using Oracle SQL access to Kafka (OSAK):
-
The cluster access database directory is created with an empty path. This directory is used to control which Oracle users can access the Kafka cluster:
SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_ACCESS AS '';
-
A cluster configuration operating system directory using the path structure
Oracle base/
osak/cluster_name/config is created, with the corresponding Oracle directory object, where the Oracle base is/u01/app/oracle,
and the Kafka cluster name iskafkaclus2
:$mkdir /u01/app/oracle/osak/kafkaclus2/config; . . . SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_CONFIG AS 'u01/app/oracle/osak/kafkaclus2/config';
-
Create an empty
osakafka.properties
file, or create anosakafka.properties
file with OSAK tuning or debugging properties, as you decide is required for your enterprise. -
Register the Kafka cluster using
DBMS_KAKFA_ADM.REGISTER_CLUSTER()
. For example:SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER ( cluster_name => ‘KAFKACLUS2’, bootstrap_servers =>‘mykafkastartup-host:9092’, kafka_provider => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, cluster_access_dir => ‘OSAK_KAFKACLUS2_ACCESS’, credential_name => NULL, cluster_config_dir => ‘OSAK_KAFKACLUS2_CONFIG’, cluster_description => 'My test cluster kafkaclus2’, options => NULL) from dual; Output: Successful registration return 0 (zero) SQL> DBMS_KAFKA_ADM_RE….. 0
-
Grant read access to users. In this example, user examplekafka-user2 is granted access:
SQL> grant read on directory osak_kafkaclus2_access to examplekafka-user2;
UPDATE_CLUSTER_INFO
DBMS_KAFKA_ADM
procedure
UPDATE_CLUSTER_INFO
updates the Kafka host server, or the options
for the Kafka cluster, or both.
Syntax
PROCEDURE UPDATE_CLUSTER_INFO(
cluster_name IN VARCHAR2,
startup_servers IN VARCHAR2 DEFAULT NULL,
options IN CLOB DEFAULT NULL
);
Parameters
Table 116-7 UPDATE_CLUSTER_INFO Procedure Parameters for DBMS_KAFKA_ADM
Parameter | Description |
---|---|
|
Name of an existing Kafka cluster Case-insensitive |
|
The startup servers of the Kafka cluster Case-sensitive Because this is part of the Kafka source configuration, this string is treated as an opaque value (used as is), which is passed to the Kafka cluster connection logic. |
|
(Optional) Cluster options. The format is a JSON-formatted
document. For possible options, see
|
Usage Notes
The UPDATE_CLUSTER_INFO
procedure updates the Kafka
cluster definition, including the startup server list, or cluster options, or both.
It also disconnects from the Kafka server. As the Oracle SQL access to Kafka (OSAK)
Administrator, use this procedure if the Kafka cluster environment changes, and you
need to change the cluster definition and configuration.
If an update fails, then the procedure raises an exception.
Examples
In the following example, the Kafka administrator has reconfigured the cluster to use
the servers newhost
and host2
, and is now updating
the list of startup servers for the Kafka cluster
ExampleCluster
:
EXEC SYS.DBMS_KAFKA_ADM.UPDATE_CLUSTER_INFO
(‘ExampleCluster’,
‘newhost:9092,host2:9092’);