This chapter describes how to use Oracle Data Integrator's Changed Data Capture feature to detect changes occurring on the data and only process these changes in the integration flows.
This chapter includes the following sections:
Changed Data Capture (CDC) allows Oracle Data Integrator to track changes in source data caused by other applications. When running integration interfaces, thanks to CDC, Oracle Data Integrator can avoid processing unchanged data in the flow.
Reducing the source data flow to only changed data is useful in many contexts, such as data synchronization and replication. It is essential when setting up an event-oriented architecture for integration. In such an architecture, applications make changes in the data ("Customer Deletion", "New Purchase Order") during a business process. These changes are captured by Oracle Data Integrator and transformed into events that are propagated throughout the information system.
Changed Data Capture is performed by journalizing models. Journalizing a model consists of setting up the infrastructure to capture the changes (inserts, updates and deletes) made to the records of this model's datastores.
Oracle Data Integrator supports two journalizing modes:
Simple Journalizing tracks changes in individual datastores in a model.
Consistent Set Journalizing tracks changes to a group of the model's datastores, taking into account the referential integrity between these datastores. The group of datastores journalized in this mode is called a Consistent Set.
The journalizing components are:
Journals: Where changes are recorded. Journals only contain references to the changed records along with the type of changes (insert/update, delete).
Capture processes: Journalizing captures the changes in the source datastores either by creating triggers on the data tables, or by using database-specific programs to retrieve log data from data server log files. See the Oracle Fusion Middleware Connectivity and Knowledge Modules Guide for Oracle Data Integrator for more information on the capture processes available for the technology you are using.
Subscribers: CDC uses a publish/subscribe model. Subscribers are entities (applications, integration processes, etc.) that use the changes tracked on a datastore or on a consistent set. They subscribe to a model's CDC to have the changes tracked for them. Changes are captured only if there is at least one subscriber to the changes. When all subscribers have consumed the captured changes, these changes are discarded from the journals.
Journalizing views: Provide access to the changes and the changed data captured. They are used by the user to view the changes captured, and by integration processes to retrieve the changed data.
These components are implemented in the journalizing infrastructure.
Simple Journalizing enables you to journalize one or more datastores. Each journalized datastore is treated separately when capturing the changes.
This approach has a limitation, illustrated in the following example: You want to process changes in the ORDER and ORDER_LINE datastores (with a referential integrity constraint based on the fact that an ORDER_LINE record should have an associated ORDER record). If you have captured insertions into ORDER_LINE, you have no guarantee that the associated new records in ORDERS have also been captured. Processing ORDER_LINE records with no associated ORDER records may cause referential constraint violations in the integration process.
Consistent Set Journalizing provides the guarantee that when you have an ORDER_LINE change captured, the associated ORDER change has been also captured, and vice versa. Note that consistent set journalizing guarantees the consistency of the captured changes. The set of available changes for which consistency is guaranteed is called the Consistency Window. Changes in this window should be processed in the correct sequence (ORDER followed by ORDER_LINE) by designing and sequencing integration interfaces into packages.
Although consistent set journalizing is more powerful, it is also more difficult to set up. It should be used when referential integrity constraints need to be ensured when capturing the data changes. For performance reasons, consistent set journalizing is also recommended when a large number of subscribers are required.
It is not possible to journalize a model (or datastores within a model) using both consistent set and simple journalizing.
This section explains how to set up and start the journalizing infrastructure, and check that this infrastructure is running correctly. It also details the components of this infrastructure.
The basic process for setting up CDC on an Oracle Data Integrator data model is as follows:
Set the CDC parameters in the data model
Add the datastores to the CDC
For consistent set journalizing, set the datastores order
Add subscribers
Start the journals
Setting up the CDC parameters is performed on a data model. This consists of selecting or changing the journalizing mode and journalizing Knowledge Module used for the model.
To set up the CDC parameters:
In the Models tree in the Designer Navigator, select the model that you want to journalize.
Double-click this model to edit it.
In the Journalizing tab, select the journalizing mode you want to use: Consistent Set or Simple.
Select the Journalizing Knowledge Module (JKM) you want to use for this model. Only Knowledge Modules suitable for the data model's technology and journalizing mode, and that have been previously imported into at least one of your projects will appear in the list.
Set the Options for this KM. See the Oracle Fusion Middleware Connectivity and Knowledge Modules Guide for Oracle Data Integrator for more information about this KM and its options.
From the File menu, select Save All.
Note:
If the model is already being journalized, it is recommended that you stop journalizing with the existing configuration before modifying the data model journalizing parameters.Add or remove datastores for the CDC:
You must flag the datastores that you want to journalize within the journalized model. A change in the datastore flag is taken into account the next time the journals are (re)started. When flagging a model or a sub-model, all of the datastores contained in the model or sub-model are flagged.
To add or remove datastores for the CDC:
Right-click the model, sub-model or datastore that you want to add to/remove from the CDC in the Model tree in the Designer Navigator.
Right-click then select Changed Data Capture > Add to CDC or Changed Data Capture > Remove from CDC to add to the CDC or remove from the CDC the selected datastore, or all datastores in the selected model/sub-model.
The datastores added to CDC should now have a marker icon. The journal icon represents a small clock. It should be yellow, indicating that the journal infrastructure is not yet in place.
Note:
It is possible to add datastores to the CDC after the journal creation phase. In this case, the journals should be re-started.If a datastore with journals running is removed from the CDC in simple mode, the journals should be stopped for this individual datastore. If a datastore is removed from CDC in Consistent Set mode, the journals should be restarted for the model (Journalizing information is preserved for the other datastores).
Set the datastores order (consistent set journalizing only):
You only need to arrange the datastores in order when using consistent set journalizing. You should arrange the datastores in the consistent set in an order which preserves referential integrity when using their changed data. For example, if an ORDER table has references imported from an ORDER_LINE datastore (i.e. ORDER_LINE has a foreign key constraint that references ORDER), and both are added to the CDC, the ORDER datastore should come before ORDER_LINE. If the PRODUCT datastore has references imported from both ORDER and ORDER_LINE (i.e. both ORDER and ORDER_LINE have foreign key constraints to the PRODUCT table), its order should be lower still.
To set the datastores order:
In the Models tree in the Designer Navigator, select the model journalized in consistent set mode.
Double-click this model to edit it.
Go to the Journalized Tables tab.
If the datastores are not currently in any particular order, click the Reorganize button. This feature suggests an order for the journalized datastores based on the foreign keys defined in the model. Review the order suggested and edit the datastores order if needed.
Select a datastore from the list, then use the Up and Down buttons to move it within the list. You can also directly edit the Order value for this datastore.
Repeat the previous step until the datastores are ordered correctly.
From the File menu, select Save All.
Note:
Changes to the order of datastores are taken into account the next time the journals are (re)started.If existing scenarios consume changes from this CDC set, you should regenerate them to take into account the new organization of the CDC set.
Each subscriber consumes in a separate thread changes that occur on individual datastores for Simple Journalizing or on a model for Consistent Set Journalizing. Adding or removing a subscriber registers it to the CDC infrastructure in order to trap changes for it.
To add subscribers:
In the Models tree in the Designer Navigator, select the journalized data model if using Consistent Set Journalizing or select a data model or an individual datastore if using Simple Journalizing.
Right-click, then select Changed Data Capture > Subscriber > Subscribe. A window appears which lets you select your subscribers.
Type a Subscriber name, then click the Add Subscriber button. Repeat the operation for each subscriber you want to add.
Note:
Subscriber names cannot contain single quote characters.Click OK.
In the Execution window, select the execution parameters:
Select the Context into which the subscribed must be registered.
Select the Logical Agent that will run the journalizing tasks.
Click OK.
The Session Started Window appears.
Click OK.
You can review the journalizing tasks in the Operator Navigator.
Removing a subscriber is a similar process. Select the Changed Data Capture > Subscriber > Unsubscribe option instead.
You can also add subscribers after starting the journals. Subscribers added after journal startup will only retrieve changes captured since they were added to the subscribers list.
Starting the journals creates the CDC infrastructure if it does not exist yet. It also validates the addition, removal and order changes for journalized datastores.
Dropping the journals deletes the entire journalizing infrastructure.
Note:
Dropping the journals deletes all captured changes as well as the infrastructure. For simple journalizing, starting the journal in addition deletes the journal contents. Consistent Set JKMs support restarting the journals without losing any data.To start or drop the journals:
In the Models tree in the Designer Navigator, select the journalized data model if using Consistent Set Journalizing or select a data model or an individual datastore if using Simple Journalizing.
Right-click, then select Changed Data Capture > Start Journal if you want to start the journals, or Changed Data Capture > Drop Journal if you want to stop them.
In the Execution window, select the execution parameters:
Select the Context into which the journals must be started or dropped.
Select the Logical Agent that will run the journalizing tasks.
Click OK.
The Session Started Window appears.
Click OK.
A session begins to start or drops the journals. You can review the journalizing tasks in the Operator Navigator.
The journalizing infrastructure is implemented by the journalizing KM at the physical level. Consequently, Add Subscribers and Start Journals operations should be performed in each context where journalizing is required for the data model. It is possible to automate these operations using Oracle Data Integrator packages. Automating these operations is recommended to deploy a journalized infrastructure across different contexts.
For example, a developer will manually configure CDC in the Development context. When the development phase is complete, he provides a package that automates the CDC infrastructure. CDC is automatically deployed in the Test context by using this package. The same package is also used to deploy CDC in the Production context.
An overview designing such a package follows. See Chapter 10, "Working with Packages" for more information on package creation.
To automate CDC configuration:
Create a new package.
Drag and drop from the Models accordion the model or datastore you want to journalize into the package Diagram tab. A new package step appears.
Double-Click the step icon in the package diagram. The properties inspector for this steps opens.
In the Type list, select Journalizing Model/Datastore.
Check the Start box to start the journals.
Check the Add Subscribers box, then enter the list of subscribers into the Subscribers group.
Enter the first subscriber in the subscriber field, and click the Add button to add it to the Subscribers list. Repeat this operation for all your subscribers.
From the File menu, select Save.
When this package is executed in a context, it starts the journals according to the model configuration and creates the specified subscribers in this context.
It is possible to split subscriber and journal management into different steps and packages. Deleting subscribers and stopping journals can be automated in the same manner.
When the journals are started, the journalizing infrastructure (if not installed yet) is deployed or updated in the following locations:
When the journalizing Knowledge Module creates triggers, they are installed on the tables in the Work Schema for the Oracle Data Integrator physical schema containing the journalized tables. Journalizing trigger names are prefixed with the prefix defined in the Journalizing Elements Prefixes for the physical schema. The default value for this prefix is T$. For details about database-specific capture processes see the Oracle Fusion Middleware Connectivity and Knowledge Modules Guide for Oracle Data Integrator.
A CDC common infrastructure for the data server is installed in the Work Schema for the Oracle Data Integrator physical schema that is flagged as Default for this data server. This common infrastructure contains information about subscribers, consistent sets, etc. for all the journalized schemas of this data server. This common infrastructure consists of tables whose names are prefixed with SNP_CDC_.
Journal tables and journalizing views are installed in the Work Schema for the Oracle Data Integrator physical schema containing the journalized tables. The journal table and journalizing view names are prefixed with the prefixes defined in the Journalizing Elements Prefixes for the physical schema. The default value is J$ for journal tables and JV$ for journalizing views
All components (except the triggers) of the journalizing infrastructure (like all Data Integrator temporary objects, such as integration, error and loading tables) are installed in the Work Schema for the Oracle Data Integrator physical schemas of the data server. These work schemas should be kept separate from the schema containing the application data (Data Schema).
Important:
The journalizing triggers are the only components for journalizing that must be installed, when needed, in the same schema as the journalized data. Before creating triggers on tables belonging to a third-party software package, please check that this operation is not a violation of the software agreement or maintenance contract. Also ensure that installing and running triggers is technically feasible without interfering with the general behavior of the software package.Datastores in models or interfaces have an icon marker indicating their journalizing status in Designer's current context:
OK - Journalizing is active for this datastore in the current context, and the infrastructure is operational for this datastore.
No Infrastructure - Journalizing is marked as active in the model, but no appropriate journalizing infrastructure was detected in the current context. Journals should be started. This state may occur if the journalizing mode implemented in the infrastructure does not match the one declared for the model.
Remnants - Journalizing is marked as inactive in the model, but remnants of the journalizing infrastructure such as the journalizing table have been detected for this datastore in the context. This state may occur if the journals were not stopped and the table has been removed from CDC.
Once journalizing is started and changes are tracked for subscribers, it is possible to use the changes captured. These can be viewed or used when the journalized datastore is used as a source of an interface.
To view the changed data:
In the Models tree in the Designer Navigator, select the journalized datastore.
Right-click and then select Changed Data Capture > Journal Data....
The changes captured for this datastore in the current context appear in a grid with three additional columns describing the change details:
JRN_FLAG: Flag indicating the type of change. It takes the value I for an inserted/updated record and D for a deleted record.
JRN_SUBSCRIBER: Name of the Subscriber.
JRN_DATE: Timestamp of the change.
Journalized data is mostly used within integration processes. Changed data can be used as the source of integration interfaces. The way it is used depends on the journalizing mode.
Using changed data from simple journalizing consists of designing interfaces using journalized datastores as sources. See Chapter 11, "Working with Integration Interfaces" for detailed instructions for creating interfaces.
Designing Interfaces with Simple Journalizing
When a journalized datastore is inserted into an interface diagram, a Journalized Data Only check box appears in this datastore's property panel.
When this box is checked:
The journalizing columns (JRN_FLAG, JRN_DATE and JRN_SUBSCRIBER) become available for the datastore.
A journalizing filter is also automatically generated on this datastore. This filter will reduce the amount of source data retrieved to the journalized data only. It is always executed on the source. You can customize this filter (for instance, to process changes in a time range, or only a specific type of change). A typical filter for retrieving all changes for a given subscriber is: JRN_SUBSCRIBER = '<subscriber_name>'.
In simple journalizing mode all the changes taken into account by the interface (after the journalizing filter is applied) are automatically considered consumed at the end of the interface and removed from the journal. They cannot be used by a subsequent interface.
When processing journalized data, the SYNC_JRN_DELETE option of the integration Knowledge Module should be set carefully. It invokes the deletion from the target datastore of the records marked as deleted (D) in the journals and that are not excluded by the journalizing filter. If this option is set to No, integration will only process inserts and updates.
Note:
Only one datastore per dataset can have the Journalized Data Only option checked.Using Changed data in Consistent journalizing is similar to simple journalizing for interface design. It requires extra steps before and after processing the changed data in the interfaces in order to enforce changes consistently within the set.
These operations can be performed either manually from the context menu of the journalized model or automated with packages.
Operations Before Using the Changed Data
The following operations should be undertaken before using the changed data when using consistent set journalizing:
Extend Window: The Consistency Window is a range of available changes in all the tables of the consistency set for which the insert/update/delete are possible without violating referential integrity. The extend window operation (re)computes this window to take into account new changes captured since the latest Extend Window operation. This operation is implemented using a package step with the Journalizing Model Type. This operation can be scheduled separately from other journalizing operations.
Lock Subscribers: Although the extend window is applied to the entire consistency set, subscribers consume the changes separately. This operation performs a subscriber(s) specific "snapshot" of the changes in the consistency window. This snapshot includes all the changes within the consistency window that have not been consumed yet by the subscriber(s). This operation is implemented using a package step with the Journalizing Model Type. It should be always performed before the first interface using changes captured for the subscriber(s).
The changed data in consistent set journalizing are also processed using interfaces sequenced into packages.
Designing interfaces when using consistent set journalizing is similar to simple journalizing, except for the following differences:
The changes taken into account by the interface (that is filtered with JRN_FLAG, JRN_DATE and JRN_SUBSCRIBER) are not automatically purged at the end of the interface. They can be reused by subsequent interfaces. The unlock subscriber and purge journal operations described below are required to commit consumption of these changes, and remove useless entries from the journal respectively.
In consistent mode, the JRN_DATE column should not be used in the journalizing filter. Using this timestamp to filter the changes consumed does not entirely ensure consistency in these changes.
Operations after Using the Changed Data
After using the changed data, the following operations should be performed:
Unlock Subscribers: This operation commits the use of the changes that where locked during the Lock Subscribers operations for the subscribers. It should be processed only after all the changes for the subscribers have been processed. This operation is implemented using a package step with the Journalizing Model Type. It should be always performed after the last interface using changes captured for the subscribers. If the changes need to be processed again (for example, in case of an error), this operation should not be performed.
Purge Journal: After all subscribers have consumed the changes they have subscribed to, entries still remain in the journalizing tables and should be deleted. This is performed by the Purge Journal operation. This operation is implemented using a package step with the Journalizing Model Type. This operation can be scheduled separately from the other journalizing operations.
Note:
It is possible to perform an Extend Window or Purge Journal on a datastore. These operations process changes for tables that are in the same consistency set at different frequencies. These options should be used carefully, as consistency for the changes may be no longer maintained at the consistency set levelAutomate Consistent Set CDC Operations
To automate the consistent set CDC usage, you can use a package performing these operations.
Create a new package.
Drag and drop from the Models tree the journalized model into the package Diagram tab. A new package step appears.
Double-Click the step icon in the package diagram. The properties inspector for this step opens.
In the Type list, select Journalizing Model/Datastore.
Check the consistent set operations you want to perform.
If you checked the Lock Subscriber or Unlock Subscriber operations, enter the first subscriber in the subscriber field, and click the Add button to add it to the Subscribers list. Repeat this operation for all the subscribers you want to lock or unlock.
From the File menu, select Save All.
Note:
Only one datastore per dataset can have the Journalized Data Only option checked.Oracle Data Integrator provides a set of tools that can be used in journalizing to refresh information on the captured changes or trigger other processes:
OdiWaitForData waits for a number of rows in a table or a set of tables.
OdiWaitForLogData waits for a certain number of modifications to occur on a journalized table or a list of journalized tables. This tool calls OdiRefreshJournalCount to perform the count of new changes captured.
OdiWaitForTable waits for a table to be created and populated with a pre-determined number of rows.
OdiRetrieveJournalData retrieves the journalized events for a given table list or CDC set for a specified journalizing subscriber. Calling this tool is required if using Database-Specific Processes to load journalizing tables. This tool needs to be used with specific Knowledge Modules. See the Knowledge Module description for more information.
OdiRefreshJournalCount refreshes the number of rows to consume for a given table list or CDC set for a specified journalizing subscriber.
See Appendix A, "Oracle Data Integrator Tools Reference" for more information on these functions.
A number of templates may be used when designing packages to use journalized data. Below are some typical templates. See Chapter 10, "Working with Packages" for more information on package creation.
Template 1: One Simple Package (Consistent Set)
Step 1: Extend Window + Lock Subscribers
Step 2 to n-1: Interfaces using the journalized data
Step n: Unlock Subscribers + Purge Journal
This package is scheduled to process all changes every minutes. This template is relevant if changes are made regularly in the journalized tables.
Template 2: One Simple Package (Simple Journalizing)
Step 1 to n: Interfaces using the journalized data
This package is scheduled to process all changes every minutes. This template is relevant if changes are made regularly in the journalized tables.
Template 3: Using OdiWaitForLogData (Consistent Set or Simple)
Step 1: OdiWaitForLogData. If no new log data is detected after a specified interval, end the package.
Step 2: Execute a scenario equivalent to the template 1 or 2, using OdiStartScen
This package is scheduled regularly. Changed data will only be processed if new changes have been detected. This avoids useless processing if changes occur sporadically to the journalized tables (i.e. to avoid running interfaces that would process no data).
Template 4: Separate Processes (Consistent Set)
This template dissociates the consistency window, the purge, and the changes consumption (for two different subscribers) in different packages.
Package 1: Extend Window
Step 1: OdiWaitForLogData. If no new log data is detected after a specified interval, end the package.
Step 2: Extend Window.
This package is scheduled every minute. Extend Window may be resource consuming. It is better to have this operation triggered only when new data appears.
Package 2: Purge Journal (at the end of week)
Step 1: Purge Journal
This package is scheduled once every Friday. We will keep track of the journals for the entire week.
Package 3: Process the Changes for Subscriber A
Step 1: Lock Subscriber A
Step 2 to n-1: Interfaces using the journalized data for subscriber A
Step n: Unlock Subscriber A
This package is scheduled every minute. Such a package is used for instance to generate events in a MOM.
Package 4: Process the Changes for Subscriber B
Step 1: Lock Subscriber B
Step 2 to n-1: Interfaces using the journalized data for subscriber B
Step n: Unlock Subscriber B
This package is scheduled every day. Such a package is used for instance to load a data warehouse during the night with the changed data.