Understand Event Subscription
In case of webhook triggers, to receive events from an external application, you must subscribe to those events.
The webhook configuration process involves subscribing to events and specifying the destination URI where the external application should send the event data. You can create a subscription for events in either of the following ways:
-
Manual: Create a webhook subscription through the administration page of the external application.
-
Programmatic: Execute an API call to the external application.
As part of the subscription set up, the external application may issue a ping to ensure that the endpoint can be reached. It may also require an explicit acknowledgment of the ping. See Validation During Webhook Registration.
In a similar way, you can unsubscribe to events when necessary.
In Oracle Integration, the registration of a webhook corresponds to the activation of an integration flow, which indicates that Oracle Integration is ready to receive events. A deregistration corresponds to the deactivation of an integration flow, which indicates that the integration flow is not receiving events.
The adapter definition document of the Rapid Adapter Builder supports both the Register
and Deregister
constructs. As shown in the following example, both these constructs take a flow. The purpose of each flow is to execute one or more API calls to programmatically subscribe to the webhook on the external service.
"subscription": {
"register": "flow:CreateSubscriptionFlow",
"deregister": "flow:DeleteSubscriptionFlow"
},
The Flow to Register and Deregister a Subscription
Here is an example flow for the Register
and Deregister
constructs.
Register
logic in the flow performs the following actions:
- Lists the subscriptions for the logged-in account by executing an API call to the external application.
- Determines if a subscription already exists by parsing the results and executing a jq expression. This is modeled as a CNCF function.
- If a subscription exists, it updates the subscription through an API call.
- If a subscription does not exist, it creates a new subscription through an API call.
The Deregister
logic in the flow just calls the API to delete the subscription, but it does not check if a subscription exists (to avoid additional API calls).
Note:
In the following example code, the Oracle Integration endpoint (represented as.integrationProperties.INTEGRATION_FLOW_URL
) helps find the details of the subscription registration.
"CreateSubscriptionFlow": {
"id": "CreateSubscriptionFlow",
"description": "CreateSubscriptionFlow",
"version": "0.1",
"start": "startState",
"specVersion": "0.8",
"functions": [
{
"name": "ResolveSubscriptionName",
"operation": "(.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-3]) as $intName | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-2]) as $flow | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[2]) as$host | (if (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-5])|test(\"project\") then $intName+\"_\"+\"project_\"+$flow+\"_\"+$host else $intName+\"_\"+$flow+\"_\"+$host end )",
"type": "expression"
},
{
"name": "generalRestFunc",
"operation": "connectivity::rest",
"type": "custom"
},
{
"name": "TransformSubName",
"operation": "[.integrationProperties.INTEGRATION_FLOW_URL==.subscriptionList.body.subscriptions[].pushConfig.pushEndpoint]|any",
"type": "expression"
}
],
"states": [
{
"actions": [
{
"functionRef": "ResolveSubscriptionName",
"actionDataFilter": {
"toStateData": "${ .subscriptionName }"
}
},
{
"name": "ListSubscriptionFunction",
"functionRef": {
"refName": "generalRestFunc",
"arguments": {
"uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions\"}",
"method": "GET",
"parameters": {
"pageSize": 1000
}
}
},
"actionDataFilter": {
"results": "${{ body: .body, headers: .headers }}",
"toStateData": "${ .subscriptionList }"
}
},
{
"functionRef": "TransformSubName",
"actionDataFilter": {
"toStateData": "${ .subscriptionExists }"
}
},
{
"name": "UpdateSubscriptionFunction",
"functionRef": {
"refName": "generalRestFunc",
"arguments": {
"uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions/\"+.subscriptionName}",
"method": "PATCH",
"body": "${{subscription:{ackDeadlineSeconds: .configuration.ackDeadline,pushConfig:{oidcToken: {serviceAccountEmail: .connectionProperties.serviceAccount}},deadLetterPolicy: (if .configuration.enableDeadLettering == \"true\" then {deadLetterTopic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.deadLetterTopic), maxDeliveryAttempts:.configuration.maxDeliveryAttempts} else null end),retainAckedMessages: (if .configuration.retainAckedMessages == \"true\" then .configuration.retainAckedMessages else null end),messageRetentionDuration: (if .configuration.messageRetentionDuration ==\"true\" then (.configuration |if .days!=null and .days!=\"7\" then((.days|tonumber * 24*60*60 )+ (.hours|tonumber * 60*60) + (.minutes|tonumber * 60) | tostring +\"s\") else ((.days|tonumber * 24*60*60 )| tostring +\"s\") end) else null end) }| with_entries(select(.value != null)),updateMask:(.configuration | \"retainAckedMessages,ackDeadlineSeconds,pushConfig.oidcToken.serviceAccountEmail\" + (if .enableDeadLettering == \"true\" then \",deadLetterPolicy.deadLetterTopic,deadLetterPolicy.maxDeliveryAttempts\" else \"\" end) + (if .messageRetentionDuration == \"true\" then \",messageRetentionDuration\" else \"\" end))}}"
}
},
"actionDataFilter": {
"results": "${ { body: .body, headers: .headers } }",
"toStateData": "${ .output }"
},
"condition": "${.subscriptionExists==true}"
},
{
"name": "CreateSubscriptionFunction",
"functionRef": {
"refName": "generalRestFunc",
"arguments": {
"uri": "${\"https://\"+.connectionProperties.hostName+\"/v1/projects/\"+.connectionProperties.projectID+\"/subscriptions/\"+.subscriptionName}",
"method": "PUT",
"body": "${{topic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.TopicName),retryPolicy:{minimumBackoff: \"30s\",maximumBackoff: \"300s\"},ackDeadlineSeconds: .configuration.ackDeadline,expirationPolicy:{}, pushConfig: {pushEndpoint: .integrationProperties.INTEGRATION_FLOW_URL,oidcToken: {serviceAccountEmail: .connectionProperties.serviceAccount}}, deadLetterPolicy: (if .configuration.enableDeadLettering == \"true\" then {deadLetterTopic: (\"projects/\"+.connectionProperties.projectID+\"/topics/\"+.configuration.deadLetterTopic), maxDeliveryAttempts:.configuration.maxDeliveryAttempts} else null end), filter: (if .configuration.filter then .configuration.filter else null end), enableMessageOrdering: (if .configuration.enableMessageOrdering == \"true\" then .configuration.enableMessageOrdering else null end),retainAckedMessages: (if .configuration.retainAckedMessages == \"true\" then .configuration.retainAckedMessages else null end),messageRetentionDuration: (if .configuration.messageRetentionDuration ==\"true\" then (.configuration |if .days!=null and .days!=\"7\" then((.days|tonumber * 24*60*60 )+ (.hours|tonumber * 60*60) + (.minutes|tonumber * 60) | tostring +\"s\") else ((.days|tonumber * 24*60*60 )| tostring +\"s\") end) else null end) } | with_entries(select(.value != null))}"
}
},
"actionDataFilter": {
"results": "${ { body: .body, headers: .headers } }",
"toStateData": "${ .output }"
},
"condition": "${.subscriptionExists==false}"
}
],
"name": "startState",
"type": "operation",
"end": true
}
]
},
"DeleteSubscriptionFlow": {
"id": "DeleteSubscriptionFlow",
"version": "0.1",
"start": "startState",
"specVersion": "0.8",
"functions": [
{
"name": "generalRestFunc",
"operation": "connectivity::rest",
"type": "custom"
}
],
"states": [
{
"actions": [
{
"name": "DeleteSubscriptionFunction",
"functionRef": {
"refName": "generalRestFunc",
"arguments": {
"uri": "https://{hostName}/v1/projects/{projectID}/subscriptions/{name}",
"method": "DELETE",
"parameters": {
"hostName": "${.connectionProperties.hostName}",
"name": "${(.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-3]) as $intName | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[-2]) as $flow | (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\") | .[2]) as$host | (if (.integrationProperties.INTEGRATION_FLOW_URL| split(\"/\")| .[-5])|test(\"project\") then $intName+\"_\"+\"project_\"+$flow+\"_\"+$host else $intName+\"_\"+$flow+\"_\"+$host end )}",
"projectID": "${.connectionProperties.projectID}"
}
}
},
"actionDataFilter": {
"results": "${ { body: .body, headers: .headers } }",
"toStateData": "${ .output }"
}
}
],
"name": "startState",
"type": "operation",
"end": true
}
]
}