An Oracle CQL Processor processes incoming events from various input channels and other data sources. You use Oracle CQL to write the business logic in the form of continuous queries that process incoming events. Oracle CQL filters, aggregates, correlates, and processes events in real time.
Note:
You can create a Java class with methods that enhance the functionality available in Oracle CQL. Within Oracle CQL you reference the compiled class by name and call its methods from a SELECT
statement. See SELECT Clause in Oracle CQL Language Reference.
This chapter includes the following sections:
This chapter presents an overview of Oracle CQL with examples to help you understand the basic concepts. See Cached Event Data for information about performing CQL queries on caches. See also Oracle CQL Queries, Views, and Joins in Oracle CQL Language Reference.
Oracle JDeveloper provides Oracle CQL Pattern components that provide templates for creating common Oracle CQL queries. See Using Oracle CQL Patterns in Getting Started with Event Processing for Oracle Stream Analytics.
This chapter describes some of the assembly and configuration file Oracle CQL Processor settings For a complete reference, see Processor inSchema Reference for Oracle Stream Analytics .
Oracle CQL queries can define one or more statements to process incoming event data from one or more input sources and send the outgoing event data to one or more output channels.
Each channel (input or output) and data source has an associated event type.
For example, one input can be a channel and another input can be a Coherence cache. The channel and Coherence cache have different event types because the Coherence cache provides additional information to the Oracle CQL processor query that is related to, but not the same as, the event data coming from the input channel.
If you configure an Oracle CQL processor with more than one query, by default, all queries output their results to all of the output channels. You can control which queries output their results to which output channels by putting a selector
element on the downstream channel or channels. Use the selector
element to specify a space delimited list of one or more query names that can output their results to that channel. The Oracle CQL query assigned to the output channel has the correct attributes to match the event type defined on the output channel. For more information, see Control Which Queries Output to a Downstream Channel.
When you add an Oracle CQL processor to your EPN, the assembly file shows the following entry.
<wlevs:processor id="processor"/>
When you add an Oracle CQL Pattern such as the Averaging Rule to the EPN in Oracle JDeveloper, the assembly file shows the following entries.
<wlevs:processor id="processor"/> <wlevs:processor id="averaging-rule"/>
Configuration File
When you add the Oracle CQL processor to your EPN, the configuration file shows the following entry. By default, you get a template for rules that contains a template for one query.
<processor> <name>processor</name> <rules> <query id="ExampleQuery"><![CDATA[ select * from MyChannel [now] > </query> </rules> </processor>
The rules
element groups the Oracle CQL statements that the Oracle CQL statements this processor executes.
The query
element contains Oracle CQL select statements. The query
element id
attribute defines the query name.
The XML CDATA
type indicates where to put the Oracle CQL rule.
The select
statement is the actual query. The template provides the [now]
operator so that you can perform a now
operation as described in NOW and Last Event Windows.
The following sections show how to perform basic Oracle CQL processor queries on stock trade events.
Objective
The objective for this section is understand how to use windows, slides, and views in Oracle CQL queries.
Windows convert event streams to time-based event relations to make it possible to perform Oracle CQL operations on the events. See Time-Based Relations (Windows).
Slides enable you to batch events to control how the rate at which the CQL processor outputs events. See Processor Output Control (Slides).
Views enable you to create an Oracle CQL statement that can be reused by other Oracle CQL queries. See Views.
Event Type Definition
The stock trade events used in the examples for this section are type StockTradeEventType
with the following field and type definitions:
tickerSymbol
: String
price
: Double
dailyHigh
: Double
dailyLow
: Double
closingValue
: Double
A stream channel inserts events into a collection and sends the stream to the next EPN stage. Events in a stream flow continuously, can never be deleted from the stream, and have no end. You can perform queries on the continuous stream of events flowing into your application.
A query on the input stream channel, StockTradeIChannel
, to retrieve all stock trade events with the Oracle ticker symbol follows.
SELECT tickerSymbol FROM StockTradeIStreamChannel WHERE tickerSymbol = ORCL
The following configuration file entry shows the query. ISTREAM
is a relation to stream operator described in Relation to Stream Operators.
<processor> <rules> <query id=rule1 <![CDATA[ISTREAM (SELECT tickerSymbol FROM StockTradeIStreamChannel WHERE tickerSymbol = ORCL)> </query> </rules> </processor>
A relation channel inserts events into a collection and sends the relation to the next EPN stage. A relation is a window of time on the stream that has a beginning and an end. Events in a relation can be inserted into, deleted from, and updated in the relation. For insert, delete, and update operations, events in a relation must be referenced to a particular point in time to ensure the operation takes place on the correct event. All operations on a relation are time based.
Most applications do not use relation channels. You can put a window of time on events coming from a stream channel to create a relation for time-based processing operations. To find the average price for a particular stock, you must determine a time frame (window) in which to calculate the average. When you define a window on a stream, you have a collection of data that is not flowing, and unlike a stream, has a beginning and an end. The window is an in-memory relation on which you can apply a function such as AVG
and also perform insert, update, and delete operations.
Operators that put a window of time on a stream are called stream to relation operators. The output of stream to relation operations are relations. You use relation to stream operators to convert a relation back to a stream to output a stream that contains every event, only inserted events, or only deleted events.
Oracle CQL processor output typically goes to a stream channel and on to the next stage in the EPN.
The stream to relation operators are RANGE
and ROW
.
RANGE Operator
You can specify a window of time with the time-based window operator, RANGE
, as follows:
SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 MINUTE]
In this example and to keep the example easy to understand, the range is 1 minute, ticks in seconds, and one input event is received every second. The query starts averaging the prices contained in the events at zero seconds and outputs a value of 0 because there is no event in the relation at zero seconds. When the next event arrives at 1 second, the average price is the price in the first event. When the next event arrives at 2 seconds, the average price is the average of the two events in the relation. This continues until 59 seconds (1 minute) is reached.
An important concept with time-based window operators is that the window shifts over the event stream based on time. When 60 seconds have elapsed, the window shifts by one-second to average the prices in the events from 1 to 60 seconds, and when 60 more seconds are reached, the window shifts by one more second to average the prices in the events from 2 to 61 seconds. The window shifting over the relation behavior continues as long as the application runs.
The following configuration file entry shows the query. ISTREAM
is a relation to stream operator described in Relation to Stream Operators.
<processor> <rules> <query id=rule2 <![CDATA[ISTREAM (SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 MIN > </query> </rules> </processor>
Note:
Very large numbers must be suffixed. Without the suffix, Java treats very large numbers like an integer and the value might be out of range for an integer, which throws an error.
Add a suffix as follows:
l
or L
for Longf
or F
for floatd
or D
for doublen
or N
for big decimal
For example: SELECT * FROM channel0[RANGE 1368430107027000000l nanoseconds]
ROW Operator
You can specify a tuple-based window with the time-based ROWS
operator as follows:
SELECT AVG(price) FROM StockTradeIStreamChannel [ROWS 3]
A tuple is an event, so the ROWS 3
operation means to average the price on three events in the relation starting when the first event arrives. The way it works is that the average operation is performed on the first event that enters the relation. When the second event enters the relation, the average operation is performed on the two events. When the third event enters the relation, the average operation is performed on the three events. No averaging occurs again until the fourth event enters the relation. When the fourth event enters the relation, the second, third, and fourth events are averaged. Likewise, when the fifth event enters the relation, the third, fourth, and fifth events are averaged.
The prior examples have averaged the price for all stocks. To compute the average for specific stocks in the stream, the following query uses a partitioned window.
SELECT AVG(price), tickerSymbol FROM StockTradeIStreamChannel [PARTITION by tickerSymbol ROWS 3] GROUP BY tickerSymbol
A partitioned window creates separate relation-windows for each partition. So in this example with the PARTITION by tickerSymbol
clause, stocks with the same ticker symbol are grouped by three events and averaged. Without the partition and using only the GROUP BY
clause, the tuple keeps the last three events as expected, but the ticker symbols in the tuple do not always match, which introduces averaging errors.
The following is the configuration file entry for this query. ISTREAM
is a relation to stream operator described in Relation to Stream Operators.
<procesor> <rules> <query id="Example"><![CDATA[ISTREAM select tickerSymbol, AVG(price) from StockTradeIStream [PARTITION by tickerSymbol ROWS 3] GROUP BY tickerSymbol) > </query> </rules> </processor>
The relation to stream operators are ISTREAM
, DSTREAM
, and RSTREAM
.
ISTREAM Operator
The ISTREAM
operator puts an insert event from the relation into the output stream. Events that were deleted or updated in the relation are ignored. When the average changes, the query sends a delete
event to the relation to remove the previous average and then sends an insert
event to the relation to add the new average into the relation. The following example uses the ISTREAM
operator to update the output stream when a new average is calculated.
ISTREAM (SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 MINUTE])
The following configuration file entry shows the ISTREAM
operator.
<processor> <rules> <query id=rule2 <![CDATA[ISTREAM (SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 MIN > </query> </rules> </processor>
DSTREAM Operator
Use the DSTREAM
operator to find out when a situation is no longer useful such as when a stock has been delisted from the exchange. The following example uses the DSTREAM
operator to update the output stream with the old average after the new average is calculated in the relation.
DSTREAM (SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 MINUTE])
The following configuration file entry shows the DSTREAM
operator.
<processor> <rules> <query id=rule2 <![CDATA[DSTREAM (SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 MIN > </query> </rules> </processor>
RSTREAM Operator
The RSTREAM
operator inserts all events into the output stream regardless of whether events were deleted or updated. Use this operator when you need to take downstream action on every output. The following examples uses the RSTREAM
operator to select all events in the input stream, wait for two events to arrive in the relation, and put the two events from the relation into the output stream.
RSTREAM (SELECT * FROM StockTradeIStreamChannel [ROWS 2])
The following configuration file entry shows the RSTREAM
operator.
<processor> <rules> <query id=rule2 <![CDATA[RSTREAM (SELECT * FROM StockTradeIStreamChannel [ROWS 2 > </query> </rules> </processor>
A NOW
window to contain the event that happened at the last tick of the system. With the NOW
operator, the last input event can be deleted in the next time tick (the new NOW
) so you might not have captured what you want. If you truly the last input event, use a last event window. The following example shows how to construct a NOW
window.
SELECT * FROM StockTradeIStream[NOW]
The following configuration file entry shows the NOW
operator.
<processor> <rules> <query id=rule2 <![CDATA[ISTREAM (SELECT * FROM StockTradeIStreamChannel [NOW> </query> </rules> </processor>
A last event window captures the last event received. The following example shows how to construct a last event window.
SELECT * FROM StockTradeIStream[ROWS 1]
The following configuration file entry shows a last event window.
<processor> <rules> <query id=rule2 <![CDATA[ISTREAM (SELECT * FROM StockTradeIStreamChannel [ROWS 1 > </query> </rules> </processor>
Instead of outputting query results as they happen, you can use the SLIDE
operator in a subclause to batch the output events. You can batch the events based on the number of events when you use the ROW
operator or an amount of time (time window) when you use the RANGE
operator.
Note:
When a slide value is not specified, the query assumes the default value of 1 row for tuple-based windows, and 1 time tick for time-based windows.
Batch by Number of Events
The following example outputs every 2 events (2, 4, 6, 8, ...).
SELECT * FROM StockTradeIStreamChannel[ROWS 3 SLIDE 2]
The output from the SLIDE
operator includes deleted events. When the first two events arrive in the relation, the query outputs both events to the stream. When the next event arrives, there are three events in the relation, but output happens next at the fourth event. When the fourth event arrives, the first event is deleted and output with the third and fourth events.
The following example shows how to use a slide with the RSTREAM
operator. In this case, when the fourth event arrives, events 2, 3, and 4 are sent to the output stream. The RSTREAM
operator sends all events to the output stream regardless of whether events were deleted or updated.
RSTREAM(SELECT * FROM StockTradeIStreamChannel[ROWS 3 SLIDE 2])
The following configuration file entry uses an RSTREAM
to batch by numbers.
<processor> <rules> <query id=rule2 <![CDATA[RSTREAM (SELECT * FROM StockTradeIStreamChannel [ROWS 3 SLIDE 2 > </query> </rules> </processor>
Batch by Time Window
With a time window, Oracle Stream Analytics batches events by a time interval (RANGE
operator). When you specify the time interval, Oracle CQL sends the events to the output stream at a time that is a multiple of the number you specified. For example, if you specify 5 seconds, the events are sent at 5, 10, 15, 20, and so on seconds. In the case where the first event arrives at 1, 2, or 3 seconds into the interval, the first output will be smaller than the others.
The following example specifies a range of 5 minutes with a slide every 30 seconds.
SELECT * FROM StockTradeIStream[RANGE 5 MIN SLIDE 30 SECONDS]
The following configuration file entry shows a time-based slide.
<processor> <rules> <query id=rule2 <![CDATA[RSTREAM (SELECT * FROM StockTradeIStreamChannel [RANGE 5 MIN 30 SECONDS > </query> </rules> </processor>
Views enable you to create an Oracle CQL statement that can be reused by other Oracle CQL queries. A view
element contains Oracle CQL subquery statements. The view
element id
attribute defines the view name. A top-level SELECT
statement that you create in a view
element is called a view.
Note:
Subqueries are used with binary set operators such as union, union all, and minus). You must use parentheses in the subqueries so that the right precedence is applied to the query.
The following example shows view v1
and query q1
on the view. The view selects from stream S1
of xmltype
stream elements. The view v1
uses the XMLTABLE
clause to parse data from the xmltype
stream elements with XPath
expressions. The query q1
selects from view v1
as it would from any other data source. The XMLTABLE
clause also supports XML name spaces.
An xmltype
stream contains XML data. With the Oracle CQL XMLTABLE
clause, you can parse data from an xmltype
stream into columns using XPath expressions and access the data by column name. XPath expressions enable you to navigate through elements and attributes in an XML document.
Note:
The data types in the view's schema match the data types of the parsed data in the COLUMNS
clause.
<view id="v1" schema="orderId LastShares LastPrice"><![CDATA[ SELECT X.OrderId, X.LastShares, X.LastPrice FROM S1, XMLTABLE ( "FILL" PASSING BY VALUE S1.c1 as "." COLUMNS OrderId char(16) PATH "fn:data(../@ID)", LastShares integer PATH "fn:data(@LastShares)", LastPrice float PATH "fn:data(@LastPx)" ) as X ></view> <query id="q1"><![CDATA[ IStream( select orderId, sum(LastShares * LastPrice), sum(LastShares * LastPrice) / sum(LastShares) from v1[now] group by orderId ) ></query>
Oracle CQL supports aggregate functions such as AVG
, COUNT
, SUM
, which are calculated incrementally and MAX
, and MIN
, which are not incremental.
The aggregate functions aggregate events into a Java collection so that you can use the Collection APIs to manipulate the events.
You can check for conditions on the aggregated results with the HAVING
clause. In the following example only averages higher than 50 are output.
SELECT AVG(price) FROM StockTradeIStreamChannel [RANGE 1 HOUR] HAVING AVG(price) > 50
Oracle CQL provides a variety of built-in single-row functions and aggregate functions based on the Colt open source libraries for high performance scientific and technical computing. The functions which are available as part of Colt library will not support Big Decimal data type and NULL input values. Also the value computation of the functions are not incremental. See the COLT website for details.
You can access a relational database table from an Oracle CQL query by creating a table component with an associated data source. Oracle Stream Analytics relational database table event sources are pull data sources, which means that Oracle Stream Analytics periodically polls the event source.
You can join a stream only with a NOW
window and only to a single database table.
Because changes in the table source are not coordinated in time with stream data, you can only join the table source to an event stream with a Now
window, and you can only join to a single database table.
With Oracle JDBC data cartridge, you can integrate arbitrarily complex SQL queries and multiple tables and data sources with your Oracle CQL queries. See Oracle JDBC Data Cartridge in Developing Applications with Oracle CQL Data Cartridges.
Note:
Oracle recommends the Oracle JDBC data cartridge for accessing relational database tables from an Oracle CQL statement.
Whether you use the NOW
window or the data cartridge, you must define data sources in the Oracle Stream Analytics server file as described in Define Data Sources in Administering Oracle Stream Analytics.
The following assembly file entry shows the setting for a table source with an id
attribute of Stock
.
<wlevs:table id="Stock" event-type="TradeEvent" data-source="StockDataSource"/>
Oracle Stream Analytics uses the event type and the data source to map a relational table row to the event type. The TradeEvent
event type is created from a Java class that has the following five private fields that map to columns in the relational database: symbol
, price
, lastPrice
, percChange
, and volume
.
Note:
The XMLTYPE
property is not supported for table sources.
<data-source> <name>StockDs</name> <connection-pool-params> <initial-capacity>1</initial-capacity> <max-capacity>10</max-capacity> </connection-pool-params> <driver-params> <url>jdbc:derby:</url> <driver-name>org.apache.derby.jdbc.EmbeddedDriver</driver-name> <properties> <element> <name>databaseName</name> <value>db</value> </element> <element> <name>create</name> <value>true</value> </element> </properties> </driver-params> <data-source-params> <jndi-names> <element>StockDs</element> </jndi-names> <global-transactions-protocol>None</global-transactions-protocol> </data-source-params> </data-source>
After configuration, you can define Oracle CQL queries that access the Stock
table as if it were another event stream.
In the following example, the query joins the StockTradeIStreamChannel
event stream to the Stock
table:
SELECT StockTradeIStreamChannel.symbol, StockTradeIStreamChannel.price, StockTradeIStream.lastPrice, StockTradeIStream.percChange, StockTradeIStream.volume, Stock FROM StockTraceIStreamChannel [Now], Stock WHERE StockTradeIStreamChannel.symbol = Stock.symbol
Because changes in the table source are not coordinated in time with stream data, you can only join the table source to an event stream with a Now
window, and you can only join to a single database table.
For improved performance, you can enable a CQL query to execute in parallel rather than serially, as it does by default.
When the CQL code supports it, you can configure a query so that it can process incoming events in parallel when multiple threads are available to the CQL processor.
You should enable parallel query execution only in cases where the relative order of the query output events is unimportant to the query's downstream client. For example, event ordering probably is not important if your query is intended primarily to filter events, such as to deliver to clients a set of stock transactions involving a particular company, where the transaction sequence is irrelevant.
By default (without enabling parallel execution), queries process events from a channel serially. For events routed through a channel that uses a system time stamp, event order is the order in which events are received; through a channel that is time stamped by an application, event order is the order determined by a time stamp value included in the event. Relaxing the total order constraint allows the configured query to not consider event order for that query, processing events in parallel where possible.
While specifying support for parallel query execution is at its core a simple configuration task, be sure to follow the other steps below so that you get the most out of the feature.
Use the ordering-constraint
attribute to support parallel execution.
Make sure you have enough threads calling into the processor to meet your performance goals. The maximum amount of parallel query execution is constrained by the number of threads available to the CQL processor. For example, if an adapter upstream of the processor supports the number of threads you need and there is a channel between the adapter and the processor, try configuring the channel with a max-threads
count of 0 so that it acts as a pass-through.
If you don't want a pass-through, be sure to configure the query's upstream channel with a max-threads
value greater than 1. (To make a max-threads
value setting useful, you'll need to also set the max-size
attribute to a value greater than 0.) For more information, see Channels .
Follow other guidelines related to setting the max-threads
attribute value. For example, to make a max-threads
value setting useful, you'll need to also set the max-size
attribute to a value greater than 0.
Ensure, if necessary, that a bean receiving the query results is thread-aware, such as by using synchronized blocks. For example, you might need to do so if the bean's code builds a list from results received from queries executed on multiple threads.
You enable parallel query execution by relaxing the default ordering constraint that ensures that events are processed serially. You do this by setting the ordering-constraint
attribute on a query
or view
element.
In the following example, the ordering-constraint
attribute is set to UNORDERED
so that the query will execute in parallel whenever possible:
<query id="myquery" ordering-constraint="UNORDERED"> SELECT symbol FROM S WHERE price > 10 </query>
The ordering-constraint
attribute supports the following three values:
ORDERED
means that the order of output events (as implied by the order of input events) is important. The CQL engine will process events serially. This is the default behavior.
UNORDERED
means that order of the output events is not important to the consumer of the output events. This gives the freedom to the CQLProcessor to process events in parallel on multiple threads. When possible, the query will execute in parallel on multiple threads to process the events.
PARTITION_ORDERED
means that you're specifying that order of output events within a partition is to be preserved (as implied by the order of input events) while order of output events across different partitions is not important to the consumer of the output events. This relaxation provides some freedom to the CQL engine to process events across partitions in parallel (when possible) on multiple threads.
Use the PARTITION_ORDERED
value when you want to specify that events conforming to a given partition are processed serially, but that order can be disregarded across partitions and events belonging to different partitions may be processed in parallel. When using the PARTITION_ORDERED
value, you must also add the partition-expression
attribute to specify which expression for partitioning should be the basis for relaxing the cross-partition ordering constraint.
In the following example, the GROUP BY
clause partitions the output based on symbol values. The partition-expression
attribute specifies that events in a given subset of events corresponding to a particular symbol value should be handled serially. Across partitions, on the other hand, order can be disregarded.
<query id="myquery" ordering-constraint="PARTITION_ORDERED" partitioning-expression="symbol"> SELECT COUNT(*) as c, symbol FROM S[RANGE 1 minute] GROUP BY symbol </query>
In general, you will probably see improved performance for queries by making more threads available and setting the ordering-constraint
attribute so that they're able to execute in parallel when possible. As with most performance tuning techniques, a little trial and error with these settings should yield a combination that gets better results.
However, in some cases where your queries use partitioning -- and you've set the ordering-constraint
attribute to PARTITION_ORDERED
-- you might not see the amount of scaling you'd expect. For example, consider a case in which running with four threads doesn't improve performance very much over running with two threads. In such a case, you can try using the partition-order-capacity
value to get the most out of CQL engine characteristics at work with queries that include partitions.
The partition-order-capacity
value specifies the maximum amount of parallelism that will be permitted within a given processor instance when processing a PARTITION_ORDERED
query. When available threads are handling events belonging to different partitions, the value sets a maximum number of threads that will be allowed to simultaneously run in the query.
As with other aspects of performance tuning, getting the most out of partition-order-capacity
may take a bit of experimentation. When tuning with partition-order-capacity
, a good starting point is to set it equal to the maximum number of threads you expect to have active in any CQL processor instance. In some cases (for example, at high data rates or with expensive processing downstream from the CQL processor), it may be helpful to set the partition-order-capacity
value even higher than the available number of threads. However, you should only do this if performance testing confirms that it's helpful for a given application and load.
The partition-order-capacity
value is set from one of four places, two of which you can fall back on when you do not explicitly set it yourself. For information about the settings, see Schema Reference for Oracle Stream Analytics.
These are, in order of precedence.
The partition-order-capacity
element set on a channel configuration. If you specify this on the input channel for a processor, it takes effect for any PARTITION_ORDERED
queries in that processor.
The partition-order-capacity
property in server configuration. This value will be used for all PARTITION_ORDERED
queries running on the server unless the value is set on a channel.
The max-threads
value set on a channel configuration. If you specify this on the input channel for a processor, it takes effect for any PARTITION_ORDERED
queries in that processor
A system default value (currently set to 4) is used if you don't specify either a partition-order-capacity
value or max-threads
value, or if the max-threads
value is set to 0 (meaning it's a pass-through channel).
When using partition-order-capacity
, keep in mind the following:
The partition-order-capacity
value is only useful when you're setting the ordering-constraint
attribute to PARTITION_ORDERED
.
Increasing partition-order-capacity
generally increases parallelism and scaling. For example, if your profiling reveals lock contention bottlenecks, you might find it helpful to increase partition-order-capacity
to see if contention is reduced.
Setting partition-order-capacity
even higher than the number of available threads can be helpful in some cases because of the particular way partitioning is done in the CQL processor.
There is some resource cost in memory used by specifying very high values.
Tuning this parameter is very dependent on details of the application and the input rate. Tuning by experimentation may be necessary to determine an optimal value.
Think of parallel query execution as a performance enhancement feature that you specify support for so that the CQL processor can use it whenever possible. Not all queries can be executed in parallel. This includes queries using certain CQL language features.
For example, if your query uses some form of aggregation -- such as to find the maximum value from a range of values -- the CQL processor may not be able to fully execute the query in parallel (this is needed to guarantee the correct result considering the ordering constraint). Some query semantics in themselves also constrain the query to ordered processing. Such queries will be executed serially regardless of whether you specify support for parallel execution.
Also, the IStream
, RStream
and DStream
operators maintain the state of their operand for processing, making it necessary for the CQL processor to synchronize threads in order to execute the query.
Note that the CQL processor always respects the semantic intention of your query. In cases where the ordering-constraint
attribute would change this intention, the attribute is coerced to a value that keeps the intention intact.
If you're using the partitioning-expression
attribute, keep in mind that the attribute supports a single expression only. Entering multiple property names for the value is not supported.
You can write code to handle faults that occur in code that does not have an inherent fault handling mechanism. This includes Oracle CQL code and multithreaded EPN channels.
By default, the CQL language has no mechanism for handling errors that occur, as does Java with its try/catch structure. To handle faults that occur in CQL, you can write a fault handler, then connect the handler to the EPN stage for which it handles faults, such as an Oracle CQL processor.
You can also associate a fault handler with a multithreaded channel, which is a channel whose max-threads
setting is greater than 0. This provides fault handling in the case of exceptions that are thrown to the channel from a stage that is downstream of the channel. Note that channels whose max-threads
setting is 0 are pass-through channels that already rethrow exceptions to their upstream stages. For additional information specific to fault handlers for channels, see Fault Handling.
A fault handler is a Java class that implements the com.bea.wlevs.ede.api.FaultHandler
interface. You connect the class to an EPN stage by registering your fault handler as an OSGi service and associating it with the stage. For more information about OSGi, see Spring Framework..
Without a custom fault handler, you get the following default fault handling behavior:
When an exception occurs in Oracle CQL, the CQL engine catches the exception and stops the query processor.
If an exception occurs in a stage that is downstream to the processor, then that stage is dropped as a listener.
Exceptions are logged (under the CQLServer category) and the events that are part of the exception clause are discarded.
Upstream stages are not notified of the failure.
When using custom fault handlers you write, you can:
Associate a fault handler with an Oracle CQL processor or multithreaded channel so that faults in those stages are thrown as exceptions to the handler. There, you can handle or rethrow the exception.
Allow query processing to continue as your code either handles the exception or rethrows it to the stage that is next upstream.
Save event data from being lost while handling a fault. For example, if you have configured a connection to a data source, you could save event data there.
Log fault and event information when faults occur.
Use multiple fault handlers where needed in an EPN so that exceptions thrown upstream are handled when they reach other Oracle CQL processors and channels.
Consider associating a fault handler with a stage that does not have its own mechanism for responding to faults, including Oracle CQL processors and multithreaded channels. Other stages, such as custom adapters that have their own exception-handling model, do not benefit from a fault handler.
Queries can continue as your fault handling code evaluates the fault to determine what action should be taken, including rethrowing the fault to a stage that is upstream from the Oracle CQL processor.
For example, the upstream stage of the Oracle CQL processor could be the JMS subscriber adapter, which can roll back the JMS transaction (if the session is transacted) to allow the event to be redelivered. It can also commit the transaction if the event has been redelivered already and found that the problem is not solvable.
Note that when you use a custom fault handler, the query state is reset after a fault as if the query had been stopped and restarted. In contrast the default behavior stops the query and drops all subsequent events.
You create a fault handler class by implementing the com.bea.wlevs.ede.api.FaultHandler
interface. After you have written the class, you associate it with the stage for which it handles faults by registering it as an OSGi service. For more information, see Register a Fault Handler.
Your implementation of the handleFault
method receives exceptions for the EPN stage with which the handler is associated. The exception itself is either an instance of com.bea.wlevs.ede.api.EventProcessingException
or, if there has been a JVM error, an instance of java.lang.Error
.
The method also receives a string array that contains the names of upstream stages, or catchers, to which the exception goes when your code rethrows it. If there is more than one catcher in the array, your rethrown exception goes to all of them. There are two cases when the catchers array is empty: when the exception occurs while executing a temporal query and when the exception is thrown to a channel's fault handler. In these cases, the fault handler executes in the context of a background thread, and there is no linkage to upstream stages.
An exception that is rethrown from a fault handler travels through upstream EPN stages until it is either caught or reaches a stage that cannot catch it (such as a processor or multithreaded channel that does not have an associated fault handler). Note that if you rethrow an exception, any channels in the catcher's list must have an associated fault handler to catch the exception.
The EventProcessingException
instance could also be one of the exception types that extend that class, including CQLExecutionException
, ArithmeticExecutionException
, and others. See the Java API Reference for Oracle Stream Analytics. The EventProcessingException
instance provides methods with which your code can retrieve insert, delete, and update events that were involved in generating the fault.
Your implementation of the method should do one of the following:
Consume the fault in the way that a Java try
and catch
statement might. If your implementation does not rethrow the fault, then event processing continues with subsequent events. However, query processing continues with its state reset as if the query had been restarted. The processing state is lost and processing begins fresh with events that follow those that provoked the fault.
Rethrow the fault so that it is received by upstream stages (or their fault handlers). As when the fault is consumed, queries continue processing events, although the query state is reset with subsequent events. The upstream stage receiving the fault always has the option of explicitly stopping the offending query by using the CQL processor's MBean interface.
Note:
When you update an Oracle CQL query with an MBean, do not send events during the update procedure. If you send events during some queries, the order of the events in the output stream is not guaranteed. For example, when you update an Oracle CQL query from unordered to ordered in an Oracle CQL parallelism execution.
In the following example the code provides a high-level illustration of handling a fault.
package com.example.faulthandler; import com.bea.wlevs.ede.api.FaultHandler; public class SimpleFaultHandler implements FaultHandler { private String suppress; // Called by the server to pass in fault information. @Override public void handleFault(Throwable fault, String[] catchers) throws Throwable { // Log the fault. return; } }
After you have written a fault handling class, you can associate it with an EPN stage by registering it as an OSGi service. The simplest way to do this is to register the handler declaratively in the EPN assembly file.
Note:
Due to inherent OSGi behavior, runtime fault handler registration from your configuration happens asynchronously, meaning that a small amount of warm-up time might be required before the handler can receive faults. To be sure your handler is ready for the first events that enters the network, add a wait period before the application begins to receive events.
In the following example, the EPN assembly file excerpt shows a service
element stanza that registers the SimpleFaultHandler
class as the fault handler for the Oracle CQL processor with an id
of exampleProcessor
.
<osgi:service interface="com.bea.wlevs.ede.api.FaultHandler"> <osgi:service-properties> <entry key="application.identity" value="myapp"/> <entry key="stage.identity" value="exampleProcessor"/> </osgi:service-properties> <bean class="com.example.faulthandler.SimpleFaultHandler"/> </osgi:service> <!-- A processor with a user-defined function. --> <wlevs:processor id="exampleProcessor" > ... </wlevs:processor>
For more on the schema for registering OSGi services, see http://static.springsource.org/osgi/docs/1.1.x/reference/html/appendix-schema.html
. For more on OSGi, see http://en.wikipedia.org/wiki/OSGi
.