18 Scalable Applications
You can build scalability into your application design with partitioning and parallel processing, and by taking high availability options into consideration. Oracle Stream Analytics enables you to use default or custom partitioning and parallel processing settings on channels and the upstream adapter. You can also partition an incoming JMS event stream and configure the JSMS Event stream group pattern matching.
This chapter includes the following sections:
18.1 Default Channel Scalability Settings
You can configure a channel to use the default event property-based event partitioner. With this default configuration, every time an incoming event arrives, the channel selects a listener and dispatches the event to that listener instead of broadcasting every event to every listener.
Note:
Batching is not supported when you configure a channel with an event partitioner.
Figure 18-1 shows an EPN that uses an event partitioner property to partition a channel. In this example, the inbound adapter sends events of type PriceEvent
, which has two properties: stock symbol and stock price. The example partitions the channel on the symbol
property and shows you how to add multithreading to either the channel or the upstream adapter.
18.1.2 Configure Parallel Processing on the Channel
If you want the channel to allocate threads, set the max-threads property to the number of listeners in the EPN.
If you want to provide increased concurrency downstream from the channel, you can associate a thread pool with the channel by setting the max-threads
property on the channel. The best value for the maximum number of threads can depend on many factors including the details of the Oracle CQL queries in downstream processors (do the queries allow parallel execution), and the behavior observed while running the application (are all the CPU cores utilized). As a starting point in tuning the maximum number of threads, it is reasonable to set it equal to the number of listeners on the channel.
In this example, there are 3 listeners.
<wlevs:channel id="EventPartitionerChannel" event-type="PriceEvent" max-threads="3" >
<wlevs:instance-property name="eventPartitioner" value="true" />
<wlevs:listener ref="processor1" />
<wlevs:listener ref="processor2" />
<wlevs:listener ref="processor3" />
<wlevs:source ref="inbound" />
</wlevs:channel>
18.1.4 Define a Local Partition Channel
You must configure the Oracle CQL Processor to support local partitioning.
Use the sample code given below to define local partitioning:
Example 18-1 Assembly File
<wlevs:channel id="LocalPartitionChannel" event-type="StockEvent" is-local-partitioner="true" max-threads="3">
<wlevs:instance-property name="partitionByEventProperty" value="symbol" />
</wlevs:channel>
Important Channel Properties
The local partition channel has the following important properties:
-
IS-LOCAL PARTITIONER
: Defines channel to be a local partitioning channel -
MAX-THREADS
: Specifies the degree of parallelism -
MAX-SIZE
: Determines the maximum number of buffered events per partition -
PARTITIONING ATTRIBUTE
: Specifies the attribute of the stream which will be used to partition the stream.
Example 18-2 Configuration
<processor>
<name>StockAggregateProcessor</name>
<rules>
<query id="helloworldRule">
<![CDATA[
select count(*) as symbolCount, symbol from LocalPartitionChannel group by symbol]]>
</query>
</rules>
</processor>
18.2 Partition an Incoming JMS Event Stream
You can add the ActiveActiveGroupBean
class
to the assembly file to partition an incoming JMS event stream by
a selector in a multiserver domain.
18.2.1 Configure Partitioning without High Availability
-
Create a multiserver domain.
In this example, the deployment group name is
MyDeploymentGroup
. -
Configure the Oracle Stream Analytics server configuration file on each Oracle Stream Analytics server to add the appropriate
ActiveActiveGroupBean
notification group to thegroups
child element of thecluster
element.The Oracle Stream Analytics server configuration file is located in
/Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.
Table 18-2 shows
cluster
elements for Oracle Stream Analytics serversocep-server-1
,ocep-server-2
,ocep-server-3
, andocep-server-4
. The deployment group isMyDeploymentGroup
and the notification groups are defined using defaultActiveActiveGroupBean
notification group naming.Optionally, you can specify your own group naming convention as Notification Group Naming Conventions describes.
Table 18-1 Server Configuration File Groups Element Configuration
Partition cluster Element ocep-server-1
<cluster> <server-name>ocep-server-1</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups> </cluster>
ocep-server-2
<cluster> <server-name>ocep-server-2</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups> </cluster>
ocep-server-3
<cluster> <server-name>ocep-server-3</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group3</groups> </cluster>
ocep-server-4
<cluster> <server-name>ocep-server-4</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group4</groups> </cluster>
-
Create an Oracle Stream Analytics application.
-
Add an
ActiveActiveGroupBean
element to the assembly file as follows.<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> </bean>
-
Define a parameterized
message-selector
in thejms-adapter
element for the JMS inbound adapters.-
Edit the component configuration file to add
group-binding
child elements to thejms-adapter
element for the JMS inbound adapters. -
Add one
group-binding
element for each possible JMS message-selector value as shown.
<jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <user>weblogic</user> <password>weblogic1</password> <work-manager>JettyWorkManager</work-manager> <concurrent-consumers>1</concurrent-consumers> <session-transacted>true</session-transacted> <message-selector>${CONDITION}</message-selector> <bindings> <group-binding group-id="ActiveActiveGroupBean_group1"> <param id="CONDITION">acctid > 400</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group2"> <param id="CONDITION">acctid BETWEEN 301 AND 400</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group3"> <param id="CONDITION">acctid BETWEEN 201 AND 300</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group4"> <param id="CONDITION">acctid <= 200</param> </group-binding> </bindings> </jms-adapter>
In this configuration, when the application is deployed to an Oracle Stream Analytics server with a
cluster
elementgroups
child element that containsActiveActiveGroupBean_group1
, then theCONDITION
parameter is defined asacctid > 400
and the application processes events whoseacctid
property is greater than 400.Note:
Each in-bound JMS adapter must listen to a different topic. For more information, see Adapters.
-
-
Deploy your application to the deployment group of your multiserver domain.
At runtime, each Oracle Stream Analytics server configures its instance of the application with the
message-selector
that corresponds to itsActiveActiveGroupBean
notification group. This partitions the JMS topic so that each instance of the application processes a subset of the total number of messages in parallel.
18.2.2 Configure Partitioning with High Availability
This procedure uses the example application from Configure Precise Recovery With JMS. Figure 18-2 shows the EPN diagram, and Example 18-3 and Example 18-4 show the corresponding assembly and configuration files.
The procedure creates the Oracle Stream Analytics high availability configuration shown in Figure 18-3.
Figure 18-3 ActiveActiveGroupBean With High Availability

Description of "Figure 18-3 ActiveActiveGroupBean With High Availability"
Configure Scalability in a JMS Application with High Availability
-
Create a multiserver domain.
In this example, the deployment group is named
MyDeploymentGroup
. -
Configure the Oracle Stream Analytics server configuration file on each Oracle Stream Analytics server to add the appropriate
ActiveActiveGroupBean
notification group to thegroups
child element of thecluster
element.The Oracle Stream Analytics server configuration file is located in
/Oracle/Middleware/my_oep/user_projects/domains/<domain_name>/<server_name>/config.
Table 18-2 shows
cluster
elements for Oracle Stream Analytics serversocep-server-1
,ocep-server-2
,ocep-server-3
, andocep-server-4
. The deployment group isMyDeploymentGroup
and notification groups are defined using defaultActiveActiveGroupBean
notification group names.Note that
ocep-server-1
andocep-server-2
use the same notification group name (ActiveActiveGroupBean_group1
) andocep-server-3
andocep-server-4
use the same notification group name (ActiveActiveGroupBean_group2
).Table 18-2 Server Configuration File Groups Element Configuration
Partition cluster Element ocep-server-1
<cluster> <server-name>ocep-server-1</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups> </cluster>
ocep-server-2
<cluster> <server-name>ocep-server-2</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group1</groups> </cluster>
ocep-server-3
<cluster> <server-name>ocep-server-3</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups> </cluster>
ocep-server-4
<cluster> <server-name>ocep-server-4</server-name> ... <enabled>coherence</enabled> ... <groups>MyDeploymentGroup, ActiveActiveGroupBean_group2</groups> </cluster>
-
Create an Oracle Stream Analytics high availability application.
For more information, see High Availability Applications.
-
Add an
ActiveActiveGroupBean
element to the assembly file as shown.<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> </bean>
-
Edit the component configuration file to configure a
jms-adapter
element for the inbound JMS adapters as shown.You must set each inbound JMS adapter to listen to a different topic and set
session-transacted
totrue
.<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> ... <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
-
Define a parameterized
message-selector
in thejms-adapter
element for each JMS inbound adapter.-
Edit the component configuration file to add
group-binding
child elements to thejms-adapter
element for the JMS inbound adapters. -
Add one
group-binding
element for each possible JMSmessage-selector
value as shown.
<jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> <message-selector>${CONDITION}</message-selector> <bindings> <group-binding group-id="ActiveActiveGroupBean_group1"> <param id="CONDITION">acctid <= 1000</param> </group-binding> <group-binding group-id="ActiveActiveGroupBean_group2"> <param id="CONDITION">acctid > 1000</param> </group-binding> </bindings> </jms-adapter>
In this configuration, when the application is deployed to an Oracle Stream Analytics server with a
cluster
elementgroups
child element that containsActiveActiveGroupBean_group1
, then theCONDITION
parameter is defined asacctid <= 1000
and the application processes events whoseacctid
property is less than or equal to 1000. Similarly, when the application is deployed to an Oracle Stream Analytics server with acluster
elementgroups
child element that containsActiveActiveGroupBean_group2
, then theCONDITION
parameter is defined asacctid > 1000
and the application processes events whoseacctid
property is greater than 1000. -
-
Edit the component configuration file to configure a
jms-adapter
element for the outbound JMS adapter as shown:Configure the out-bound JMS adapter with the same topic as the correlating in-bound adapter (in this example,
JMSInboundAdapter2
:./Topic2
), and setsession-transacted
totrue
.<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> ... <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSOutboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
-
Deploy your application to the deployment group of your multiserver domain.
At runtime, each Oracle Stream Analytics server configures its instance of the application with the
message-selector
that corresponds to itsActiveActiveGroupBean
notification group. This partitions the JMS topic so that each instance of the application processes a subset of the total number of messages in parallel.If the active Oracle Stream Analytics server in an
ActiveActiveGroupBean
group goes down, the Oracle Stream Analytics server performs an Oracle Stream Analytics high availability failover to the standby Oracle Stream Analytics server in thatActiveActiveGroupBean
group.
Example 18-3 Precise Recovery With JMS EPN Assembly File
<?xml version="1.0" encoding="UTF-8"?> <beans ... > <wlevs:event-type-repository> <wlevs:event-type type-name="StockTick"> <wlevs:properties> <wlevs:property name="lastPrice" type="double" /> <wlevs:property name="symbol" type="char" /> </wlevs:properties> </wlevs:event-type> </wlevs:event-type-repository> <wlevs:adapter id="JMSInboundAdapter" provider="jms-inbound"> <wlevs:listener ref="myHaInputAdapter"/> </wlevs:adapter> <wlevs:adapter id="myHaInputAdapter" provider="ha-inbound" > <wlevs:instance-property name="keyProperties" value="sequenceNo"/> <wlevs:instance-property name="timeProperty" value="inboundTime"/> </wlevs:adapter> <wlevs:channel id="channel1" event-type="StockTick"> <wlevs:listener ref="processor1" /> <wlevs:source ref="myHaInputAdapter"/> <wlevs:application-timestamped> <wlevs:expression>inboundTime</wlevs:expression> </wlevs:application-timestamped> </wlevs:channel> <wlevs:processor id="processor1"> <wlevs:listener ref="channel2" /> </wlevs:processor> <wlevs:channel id="channel2" event-type="StockTick"> <wlevs:listener ref="myHaCorrelatingAdapter" /> </wlevs:channel> <wlevs:adapter id="myHaCorrelatingAdapter" provider="ha-correlating" > <wlevs:instance-property name="correlatedSource" ref="clusterCorrelatingOutstream"/> <wlevs:instance-property name="failOverDelay" value="2000"/> <wlevs:listener ref="JMSOutboundAdapter"/> </wlevs:adapter> <wlevs:adapter id="JMSOutboundAdapter" provider="jms-outbound"> </wlevs:adapter> <wlevs:adapter id="JMSInboundAdapter2" provider="jms-inbound"> </wlevs:adapter> <wlevs:channel id="clusterCorrelatingOutstream" event-type="StockTick" advertise="true"> <wlevs:source ref="JMSInboundAdapter2"/> </wlevs:channel> </beans>
Example 18-4 Precise Recovery With JMS Component Configuration Assembly File
<?xml version="1.0" encoding="UTF-8"?> <wlevs:config xmlns:wlevs="http://www.bea.com/ns/wlevs/config/application" xmlns:ha="http://www.oracle.com/ns/cep/config/cluster"> <processor> <name>processor1</name> <rules> <query id="helloworldRule"> <![CDATA[ select * from channel1 [Now] > </query> </rules> </processor> <jms-adapter> <name>JMSInboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic1</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSInboundAdapter2</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> <jms-adapter> <name>JMSOutboundAdapter</name> <event-type>StockTick</event-type> <jndi-provider-url>t3://ppurich-pc:7001</jndi-provider-url> <destination-jndi-name>./Topic2</destination-jndi-name> <session-transacted>true</session-transacted> ... </jms-adapter> </wlevs:config>
18.3 Notification Group Naming Conventions
By default, the ActiveActiveGroupBean
class
creates notification groups with the following name where X is a string.
ActiveActiveGroupBean_X
At runtime, ActiveActiveGroupBean
scans the existing groups defined on the Oracle Event Processing
server and applies the following default pattern match. When ActiveActiveGroupBean
finds a match, it creates a notification group with that name.
ActiveActiveGroupBean_\\w+
Optionally, you can define your own group pattern to specify a different notification group naming pattern.
-
Configure the assembly file to add a
groupPattern
attribute to yourActiveActiveGroupBean
element as shown.<bean id="clusterAdapter" class="com.oracle.cep.cluster.hagroups.ActiveActiveGroupBean"> <property name="groupPattern" value="MyNotificationGroupPattern*"/> </bean>
-
Specify a value for the
groupPattern
attribute that matches the cluster group naming convention you want to use for notification groups.
18.4 Custom Channel Event Partitioner
Most channels use the default event partitioning, where
if no partitioner is specified and if the partitionByEventProperty
element is not present, the channel sends events to all listeners.
The partitionByEventProperty
element provides a level
of customization by partitioning on the specified event with a default
partitioning algorithm
This section explains how you can further customize how events are dispatched to the channel listeners by programmatically configuring a custom partitioner that provides finer control over the default partitioning algorithm. For example, you can create an event partitioner that is based on a property range
18.4.1 EventPartitioner Interface
Use the com.bea.wlevs.channel.EventPartitioner interface to partition events across a channel to customize how events are dispatched to the channel listener.
Note:
When you implement custom partitioning and parallel processing, make sure to add code to preserve event order and to carefully manage multithreading.
Figure 18-4 shows an EPN that uses an event partitioner to partition a channel. In this example, the inbound adapter sends events of type PriceEvent
, which has two properties: stock symbol and stock price. The example partitions the channel on the symbol
property and shows you how to add multithreading to the channel or to the upstream adapter.