12 Using Oracle Cloud Infrastructure (OCI) Queue with EDQ
This document describes how to get started using the Oracle Cloud Infrastructure (OCI) Queue service with Oracle Enterprise Data Quality (EDQ). This documentation is intended for system administrators responsible for installing and maintaining EDQ applications.
This chapter includes the following sections:
12.1 Introduction to OCI Queue and EDQ
EDQ realtime provider and consumer buckets can be configured to use OCI Queue service for reading and publishing. This queue service is a much better fit for realtime processing as compared to OCI streams.
12.2 Configuring EDQ to Read and Write OCI Queue Messages
OCI Queue interfaces to and from EDQ are configured using XML interface files that define:
- The path to the queue of messages
- Properties that define how to work with the specific OCI Queue technology
- How to decode the message payload into a format understood by an EDQ process (for Message Providers – where EDQ reads messages from a queue), or convert messages to a format expected by an external process (for Message Consumers – where EDQ writes messages to a queue).
The XML files are located in the EDQ Local Home directory (formerly known as the config directory), in the following paths:
- buckets/realtime/providers (for interfaces ‘in’ to EDQ)
- buckets/realtime/consumers (for interfaces ‘out’ from EDQ)
Once the XML files have been configured, Message Provider interfaces are available in Reader processors in EDQ and to map to Data Interfaces as ‘inputs’ to a process, and Message Consumer interfaces are available in Writer processors, and to map from Data Interfaces as ‘outputs’ from a process.
12.3 Defining the Interface Files
An interface file in EDQ consists of a realtimedata
element which defines the message framework. For OCI Queue interfaces, use the following:
<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="ociqueues">
...
</realtimedata>
The realtimedata
element contains three subsections:
- The
<attributes>
section, defining the shape of the interface as understood by EDQ - The
<messengerconfig>
section, defining how to connect to OCI Queue. - A message format section defining how to extract contents of a message (e.g. from XML) into attribute data readable by EDQ (for inbound interfaces), or how to convert attribute data from EDQ to message data (e.g. in XML). For provider interfaces, the element is
<incoming>
; for consumer interfaces, the element is<outgoing>
.
12.3.1 Understanding the <attributes> section
The <attributes> section defines the shape of the interface. It constitutes the attributes that are available in EDQ when configuring a Reader or Writer. For example, the following excerpt from the beginning of an interface file configures string and number attributes that can be used in EDQ:
<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="ociqueues">
<attributes>
<attribute type="string" name="messageID"/>
<attribute type="string" name="name"/>
<attribute type="number" name="AccountNumber"/>
</attributes>
[file continues]...
EDQ supports all the standard attribute types and they are:
- string
- number
- date
- stringarray
- numberarray
- datearray
12.3.2 Understanding the <messengerconfig> section
The following properties can be set in the <messengerconfig>
section:
Property | Description |
---|---|
queue |
The queue OCID (required). |
credentials |
Stored credentials name used to connect to stream; if omitted, platform (instance) authentication is used. |
interval |
Interval in milliseconds between polls for message reception. The default is 0 (long polls are used). |
proxy |
host:port proxy server for HTTPS calls. |
deletemode |
Controls message deletion after reception. Valid values are:
|
limit |
Maximum number of messages to receive in one call. The default is 20. |
visibilityInSeconds |
The duration (in seconds) that the received messages are hidden from subsequent receive requests. The default is set in the queue definition. |
timeoutInSeconds |
Wait time in seconds for long poll receive requests. If omitted the OCI default of 30s is used. |
Defaults can be set in realtime.properties
with the prefix "ociqueues.".
12.3.3 Understanding the <incoming> or <outgoing> section
The <incoming>
or <outgoing>
section defines how message metadata and values are converted to/from EDQ attributes. It consists of the following two subsections:
12.3.3.1 Understanding the <messageheaders> section
The <messageheaders>
section allows attributes to be mapped to additional sending properties for transmission, and attributes to be set from message metadata on reception. Refer to the OCI Queue Documentation for more details on each value. A non empty message header value will override a <messengerconfig>
property with the same name.
The following standard headers are available:
Header name | Settable | Readable | Type | Description |
---|---|---|---|---|
deliveryCount |
no |
yes |
number |
The number of times the message has been delivered to a consumer. |
expireAfter |
no |
yes |
date |
The time after which the message will be automatically deleted. |
id |
no |
yes |
string |
The internal message ID. |
receipt |
no |
yes |
string |
The Receipt token that is required for manual deletion of messages. |
visibleAfter |
no |
yes |
date |
The time after which the message will be visible to other consumers. |
12.3.3.2 Understanding the <messagebody> section
This section uses JavaScript to parse message payloads into attributes that EDQ can use for inbound interfaces, and perform the reverse operation (convert EDQ attribute data into message payload data) for outbound interfaces. A function named ‘extract’ is used to extract data from XML into attribute data for inbound interfaces, and a function named ‘build’ is used to build XML data from attribute data.
For more details, refer to Illustrations, which provides an example of a complete provider bucket which can receive JSON messages from a case management filter reporting trigger.
12.3.4 Illustrations
The following XML is a simple example of a complete provider bucket which can receive JSON messages from a case management filter reporting trigger. The internal message ID is also returned.
<?xml version="1.0" encoding="UTF-8"?>
<realtimedata messenger="ociqueues">
<attributes>
<attribute name="filter" type="string"/>
<attribute name="type" type="string"/>
<attribute name="xaxis" type="string"/>
<attribute name="yaxis" type="string"/>
<attribute name="server" type="string"/>
<attribute name="userid" type="number"/>
<attribute name="user" type="string"/>
<attribute name="userdisplay" type="string"/>
<attribute name="start" type="date" format="iso"/>
<attribute name="duration" type="number"/>
<attribute name="status" type="string"/>
<attribute name="sql" type="string"/>
<attribute name="arg.type" type="stringarray"/>
<attribute name="arg.value" type="stringarray"/>
<attribute name="mgid" type="string"/>
</attributes>
<messengerconfig>
queue = ocid1.queue.oc1.phx.amaaaaaa7u6obfiaotsz7yuj2zcbc5uhc45evvv7wfwedgertw3543we
credentials = OCI 1
</messengerconfig>
<incoming>
<messageheaders>
<header name="id" attribute="mgid"/>
</messageheaders>
<messagebody>
<script>
<![CDATA[
var simple = ["filter", "type", "xaxis", "yaxis", "server", "userid", "user", "userdisplay", "duration", "status", "sql"]
function extract(str) {
var obj = JSON.parse(str)
var rec = new Record()
for (let x of simple) {
rec[x] = obj[x]
}
rec.start = obj.start && new Date(obj.start)
if (obj.args) {
rec['arg.type'] = obj.args.map(a => a.type)
rec['arg.value'] = obj.args.map(a => a.value && a.value.toString())
}
return [rec];
}
]]>
</script>
</messagebody>
</incoming>
</realtimedata>