Aggregation Pipelines
Oracle Database API for MongoDB supports MongoDB aggregation pipelines (command aggregate). Most MongoDB aggregation stages, operators, and expression are supported but there are some restictions noted in Feature Support.
Each aggregation pipeline is translated into a single SQL statement and executed in Oracle Database. In contrast to MongoDB, Oracle Database does not limit the volume of data that can be sorted, joined, or grouped. You can use aggregation pipelines for reporting or analytic workloads that span billions of documents across multiple collections. Pipelines can also benefit from Oracle Database features, such as parallel query evaluation, to reduce processing time.
Example:
db.employees.insertMany([
{"name" : "SMITH", "job" : "CLERK", "sal" : 800},
{"name" : "ALLEN", "job" : "SALESMAN", "sal" : 1600},
{"name" : "WARD", "job" : "SALESMAN", "sal" : 1250}
]);
db.employees.aggregate([
{"$group": {"_id": "$job", "average_salary" : {"$avg":"$sal"}}},
{"$sort": {"_id":1}}
]);
[
{ _id: 'CLERK', average_salary: 800 },
{ _id: 'SALESMAN', average_salary: 1425 }
]
This example creates the employees collection, inserts three documents, and groups the documents by the job attribute. For each distinct job, it returns the average salary and sorts the results by job. The aggregation pipeline that uses the $group and $sort stages runs as a single SQL statement. To view the generated SQL and query plan, issue an explain command.
db.employees.aggregate([
{ "$group": { "_id": "$job", "average_salary": { "$avg": "$sal" } } },
{ "$sort": { "_id": 1 } }
] ).explain();
{
queryPlanner: {
generatedSql: 'with \n' +
'"Q1" ("ID", "DATA") as (select "RESID", "DATA" from "employees"), \n' +
'"Q2" ("KEY0", "ACC0") as (\n' +
' select\n' +
` json_query("DATA", '$."job"' error on error json null on empty)\n` +
' as "KEY0", \n' +
` json_query("DATA", '$."sal"' error on error null on empty) as ACC0\n` +
' from "Q1" q\n' +
'),\n' +
...
'select "DATA" from "Q6"',
winningPlan: ' Plan Hash Value : 1694181348 \n' +
'\n' +
'----------------------------------------------------------------------------\n' +
'| Id | Operation | Name | Rows | Bytes | Cost | Time |\n' +
'----------------------------------------------------------------------------\n' +
'| 0 | SELECT STATEMENT | | 3 | 48954 | 5 | 00:00:01 |\n' +
'| 1 | SORT ORDER BY | | 3 | 48954 | 5 | 00:00:01 |\n' +
'| 2 | VIEW | | 3 | 48954 | 4 | 00:00:01 |\n' +
'| 3 | SORT GROUP BY | | 3 | 61461 | 4 | 00:00:01 |\n' +
'| 4 | TABLE ACCESS FULL | employees | 3 | 61461 | 3 | 00:00:01 |\n' +
'----------------------------------------------------------------------------\n'
},
ok: 1
}
This example uses the explain command to display the generated SQL and query plan for an aggregation pipeline. The internally generated SQL for aggregation pipelines is often more complex and verbose than hand-written SQL. You can use hints to influence how Oracle Database evaluates the aggregation pipeline. Continuing the example, apply the $native hint to request parallel query execution.
db.employees.aggregate(
[
{ "$group": { "_id": "$job", "average_salary": { "$avg": "$sal" } } },
{ "$sort": { "_id": 1 } }
],
{"hint" : {$native:"PARALLEL"}}
).explain();
{
queryPlanner: {
generatedSql: 'with \n' +
'"Q1" ("ID", "DATA") as (select /*+ PARALLEL */ "RESID", "DATA" from "employees"), \n' +
...
'select /*+ PARALLEL */ "DATA" from "Q6"',
winningPlan: ' Plan Hash Value : 3014829552 \n' +
'\n' +
'-------------------------------------------------------------------------------\n' +
'| Id | Operation | Name | Rows | Bytes | Cost | Time |\n' +
'-------------------------------------------------------------------------------\n' +
'| 0 | SELECT STATEMENT | | 3 | 48954 | 4 | 00:00:01 |\n' +
'| 1 | SORT ORDER BY | | 3 | 48954 | 4 | 00:00:01 |\n' +
'| 2 | VIEW | | 3 | 48954 | 3 | 00:00:01 |\n' +
'| 3 | SORT GROUP BY | | 3 | 61461 | 3 | 00:00:01 |\n' +
'| 4 | PX COORDINATOR | | | | | |\n' +
'| 5 | PX SEND QC (RANDOM) | :TQ10000 | 3 | 61461 | 2 | 00:00:01 |\n' +
'| 6 | PX BLOCK ITERATOR | | 3 | 61461 | 2 | 00:00:01 |\n' +
'| 7 | TABLE ACCESS FULL | employees | 3 | 61461 | 2 | 00:00:01 |\n' +
'------------------------------------------------------------------------------\n'
...
The generated SQL now includes the PARALLEL hint, and the explain plan confirms that the statement runs in parallel. In Oracle Autonomous Database, set the consumer group to HIGH or MEDIUM to enable parallel execution. See Performance.
In addition to the stages defined by MongoDB, the Oracle API for MongoDB supports two additional stages that are not supported by MongoDB: $sql and $external.
$sql Aggregation Pipeline Stage
You can use a $sql stage to execute Oracle SQL and PL/SQL code.
Example:
db.employees.insertMany([
{"ename" : "SMITH", "job" : "CLERK", "sal" : 800},
{"ename" : "ALLEN", "job" : "SALESMAN", "sal" : 1600},
{"ename" : "WARD", "job" : "SALESMAN", "sal" : 1250}
]);
db.aggregate([ {$sql :
`select e.data.job, avg(e.data.sal) average
from emps e
group by e.data.job`
} ]);
[
{ JOB: 'CLERK', AVERAGE: 800 },
{ JOB: 'SALESMAN', AVERAGE: 1425 }
]
This example uses the $sql stage to execute a select query to compute the average of the employee salaries for each job. Unlike other types of commands, the $sql stage can be used to create and access data that is not in JSON collections.
Example:
// create a relational table named "departments"
db.aggregate([{ $sql:`create table departments(deptno number, name varchar2(4000))`}]);
// insert two rows
db.aggregate([{ $sql:`insert into departments values (10, 'ACCOUNTING'), (20, 'RESEARCH')`}]);
// select the rows directly
db.aggregate([{ $sql:`select * from departments`}]);
[
{ DEPTNO: 10, NAME: 'ACCOUNTING' },
{ DEPTNO: 20, NAME: 'RESEARCH' }
]
This example creates a table, inserts two rows, and select the data.
$sql Syntax
The $sql stage uses the following syntax:
{
$sql : {
statement : <SQL statement>,
binds : <variables>,
dialect : <dialect>,
format : <format>
}
}
You can also use the following abbreviated form:
{
$sql : <SQL statement>
}
The abbreviated form is equivalent to {$sql : {statement :<SQL statement>}}.
| Field | Description |
|---|---|
statement |
Specifies the SQL statement to execute. (required) |
binds |
Supplies the values for placeholder expressions (bind variables) in the SQL statement. (optional)
See section $sql Binds below. |
dialect |
Specifies the SQL dialect. The only supported value is "oracle". If you provide any other value, an error is raised. (optional) |
format |
Specifies the result format. The format must be "oracle". (optional) |
resetSession |
Oracle Database API for MongoDB pools and reuses database connections. The SQL statement may change the session state of a pooled connection (for example, by issuing an "alter session" statement). By default, session state changes may persist to subsequent commands if the connection is reused. Set resetSession to true to ensure that the session cannot be reused by a later command. If the command is part of a transaction, the session is reset after the transaction ends. (optional)
|
input |
Specifies the name that statement can reference in the FROM clause to access documents from the preceding stage. This parameter only applies when another aggregation stage precedes `$sql` in the pipeline or if a starting collection is specified. (optional), default value "INPUT"
|
If $sql is the only stage in the pipeline and there is no starting collection, there are no restrictions on the type of SQL that can be executed. That is, DDL, DML, and PL/SQL are supported.
For example:
db.aggregate([
{
$sql: {
statement: "update employees set salary = salary * 0.1"
}
}
]);
However, if the pipeline runs against a collection or contains multiple stages, the following restrictions apply:
- The SQL must be a
SELECTstatement. - The
SELECTstatement must project a single JSON column.
The $sql stage can reference the output from the previous stage or the collection by using the input field to reference the incoming documents. Documents from the previous stage use the column name DATA. For example:
db.employees.aggregate([
{$match: {job : "SALESMAN"}},
{
$sql: {
statement: `select json_mergepatch(t.data, json{'updated':systimestamp}) from "MATCHED" t`,
input: "MATCHED"
}
},
{$out : "sales"}
])
This example combines $sql with the $match and $out stages. The SQL function json_mergepatch appends the system timestamp to each document. The $sql stage consumes the documents from the preceding stage by selecting from the “MATCHED” row source.
$sql Result Format
Like other stages, $sql returns zero or more JSON objects. The structure of the objects depends on whether the SQL statement is a SELECT statement.
When the SQL statement is a select statement, each row in the query result maps to a JSON object in the stage result. The mapping has two cases:
- Case 1: The query returns a single column that contains JSON data. The column must be of JSON type or a BLOB, CLOB, or VARCHAR2 column with an
IS JSONconstraint. The JSON documents are returned directly from the$sqlstage. For example:scott> db.aggregate([{$sql:"select json{*} data from dept"}]); [ { DEPTNO: 10, DNAME: 'ACCOUNTING', LOC: 'NEW YORK' }, { DEPTNO: 20, DNAME: 'RESEARCH', LOC: 'DALLAS' }, { DEPTNO: 30, DNAME: 'SALES', LOC: 'CHICAGO' }, { DEPTNO: 40, DNAME: 'OPERATIONS', LOC: 'BOSTON' } ] - Case 2: If the query produces more than one column, a JSON object is constructed that contains the column values. The column aliases are used as the object key names with the corresponding column value. For example:
scott> db.aggregate([{$sql:"select deptno, dname, loc from dept"}]); [ { DEPTNO: 10, DNAME: 'ACCOUNTING', LOC: 'NEW YORK' }, { DEPTNO: 20, DNAME: 'RESEARCH', LOC: 'DALLAS' }, { DEPTNO: 30, DNAME: 'SALES', LOC: 'CHICAGO' }, { DEPTNO: 40, DNAME: 'OPERATIONS', LOC: 'BOSTON' } ]
In case 2, the SQL column values are mapped to a new BSON document. The BSON/SQL type mappings for case 2 are as follows:
| SQL Type | BSON Type |
|---|---|
| BLOB | The value will be consumed directly as a JSON value if it has an IS JSON constraint. Otherwise it will be mapped to a BSON raw. |
| CLOB | The value will be consumed directly as a JSON value if it has an IS JSON constraint. Otherwise it will be mapped to a BSON string. |
| VARCHAR2 | The value will be consumed directly as a JSON value if it has an IS JSON constraint. Otherwise it will be mapped to a BSON string. |
| JSON | direct |
| TIMESTAMP | date (assume UTC) |
| DATE | date (assume UTC) |
| TIMESTAMPTZ | date |
| NUMBER | When scale is 0, INT32 or INT64 depending on precision. Otherwise, double. |
| RAW | Binary |
| Other types | ERROR |
If the SQL statement is not a select statement, the result is a JSON object with a single entry named result. The value of result indicates how many rows were affected by the statement. For example:
scott> db.aggregate([{$sql:`
create table employee (name varchar2(4000), job varchar2(4000))
`}]);
[ { result: 0 } ]
In this example, the create table statement returns a result value of 0 because no rows were modified.
Example:
db.aggregate([{$sql:
{
statement: "insert into employee values (:name, :job)",
binds: {"name": "Bob", "job": "Programmer"}
}
}]);
[ { result: 1 } ]
In this example, the insert statement returns a result value of 1 because one row was inserted. For batch statements, result is an array that contains the row count for each execution. For example:
scott> db.aggregate([{$sql:
{
statement: "insert into employee values (:name, :job)",
binds: [
{"name": "John", "job": "Programmer"},
{"name": "Jane", "job": "Manager"}
]
}
}]);
[ { result: [ 1, 1 ] } ]
In this example, result contains two values (1 and 1) because each execution inserted a single row. Some statements affect more than one row. For example:
db.aggregate([{$sql:`
delete from employee e
where e.job = 'Programmer'
`}]);
[ { result: 2 } ]
This example deletes two rows from the table and the returned value has result set to 2.
$sql Binds
Use the binds parameter to specify the values of the bind variables (placeholder expressions) in the SQL statement. The interpretation of binds depends on the form of the value.
Binds Case 1: The value is an array of objects with the following attributes
| Field | Description |
|---|---|
index |
The index of the positional bind value. This value is mutually exclusive with the name parameter. All bind values must either user name or index and not a mix of both. Named binds are only supported when $sql is the only stage in the pipeline. |
name |
The name of the named bind value. |
value |
The bind value |
dataType |
Specifies the SQL bind type. For allowed values and defaults, see section Supported Bind Types below. |
For example:
db.aggregate([{
$sql:{
statement : `
insert into employee(empno, ename)
values(:1,:2)
`,
binds : [
{index:1, value:"E123"},
{index:2, value:"JOHN DOE"}
]
}
}]);
In this example, the value "E123" is bound to the variable :1 and the value "JOHN DOE" is bound to the variable :2.
Binds Case 2: The value is an array of primitive values
In this case, each value in the array is bound positionally. The first value in the array binds to the first bind variable, the second value binds to the second bind variable, and so on. For example:
db.aggregate([{
$sql:{
statement : `
insert into employee(empno, ename)
values(:1,:2)
`,
binds : [ "E123", "JOHN DOE" ]
}
}]);
In this example, the value "E123" is bound to :1 and the value "JOHN DOE" is bound to :2. This form is equivalent to the example in case 1, except that you cannot specify the dataType attribute. The default data type (in this case VARCHAR2) is used.
Binds Case 3: The value is an object
When binds is an object instead of an array, the object keys and values provide the values for named bind variables. For example:
db.aggregate([{
$sql:{
statement : `
insert into emp(empno, ename)
values(:empno,:ename)
`,
binds : {"empno":"E123", "ename":"JOHN DOE"}
}
}]);
In this example, the value "E123" is bound to the variable :empno and the value "JOHN DOE" is bound to the variable :ename.
Binds Case 4: The value is an array whose elements conform to case 1, case 2, or case 3
In this case, Oracle Database executes the SQL statement once for each element in the array. Each array element defines the bind values for that execution. This approach supports batch execution efficiently. Using an array of binds is more efficient than running $sql multiple times. For example:
db.aggregate([{
$sql:{
statement : `
insert into emp(empno, ename)
values(:1,:2)
`,
binds : [
["E123", "JOHN DOE"],
["E456", "JANE DOE"]
]
}
}]);
Supported Bind Types
The following table lists the supported values for the dataType attribute for a given BSON value.
| BSON Value | Supported Bind Types | Default Bind Type |
|---|---|---|
| string | JSON, VARCHAR2 | VARCHAR2 |
| double | JSON, BINARY_DOUBLE | BINARY_DOUBLE |
| int32 | JSON, NUMBER | NUMBER |
| int64 | JSON, NUMBER | NUMBER |
| decimal128 | JSON, NUMBER | NUMBER |
| true/false | JSON, VARCHAR2, BOOLEAN | BOOLEAN |
| objectid | JSON, RAW | RAW |
| binary | JSON, RAW | RAW |
| datetime | JSON, TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE |
| object | JSON, VARCHAR2 | JSON |
| array | JSON, VARCHAR2 | JSON |
| null | JSON, VARCHAR2, BINARY_DOUBLE, NUMBER, RAW, TIMESTAMP WITH TIMEZONE | VARCHAR2 |
BSON null maps to SQL null for all bind types except JSON. When the bind type is JSON, null maps to JSON null. To set a JSON column to SQL null, use the VARCHAR2 bind type.
Example using the dataType field:
result = db.aggregate([{
$sql: {
statement: `select json { 'example' : :foo} from dual`,
binds: [{"name": "foo", value: {"hello":"world"}, dataType: "VARCHAR2"}]
}
}]).toArray();
print(EJSON.stringify(result));
[{"example":"{\"hello\":\"world\"}"}]
In this example, the object {"hello":"world"} is bound to :foo as VARCHAR2, so it appears in the result as a string value. If you bind the value as JSON, it is returned as a nested object.
result = db.aggregate([{
$sql: {
statement: `select json { 'example' : :foo} from dual`,
binds: [{"name": "foo", value: {"hello":"world"}, dataType: "JSON"}]
}
}]).toArray();
print(EJSON.stringify(result));
[{"example":{"hello":"world"}}]
$external Aggregation Pipeline Stage
Use the $external stage to access JSON documents stored in files outside the database.
Example:
db.aggregate([
{$external:"https://raw.githubusercontent.com/oracle-samples/db-sample-schemas/main/order_entry/PurchaseOrders.dmp"},
{$limit:1},
{$project:{LineItems:0}}
])
[
{
PONumber: 1,
Reference: 'MSULLIVA-20141102',
Requestor: 'Martha Sullivan',
User: 'MSULLIVA',
CostCenter: 'A50',
ShippingInstructions: {
name: 'Martha Sullivan',
Address: {
street: '200 Sporting Green',
city: 'South San Francisco',
state: 'CA',
zipCode: 99236,
country: 'United States of America'
},
Phone: [ { type: 'Office', number: '979-555-6598' } ]
},
'Special Instructions': 'Surface Mail'
}
]
This example reads JSON objects from a line-delimited text file (PurchaseOrders.dmp) stored in the Oracle GitHub sample repository. It combines $external with $limit to return only the first object in the file and $project to remove the LineItems attribute from the object.
$external Syntax
{
$external : {
location : <URL or file name>,
directory : <database directory name>,
credential : <credential name>
path : <SQL/JSON path expression>
}
}
You can also use the following abbreviated syntax:
{
$external : <pre-authenticated URI>
}
This abbreviated form is equivalent to {$external : {location: <pre-authenticated URI>}} and applies only to resources with public access.
The $external stage generates a SQL statement that uses an inline external table definition based on the ORACLE_BIGDATA Access Driver. The $external fields map to the JSON parameters of the ORACLE_BIGDATA driver.
| Field | Description |
|---|---|
location |
Identifies the location of the JSON file or files to access. (required) See: Location Clause |
directory |
Identifies the directory where the file is stored. (optional) See: Location Clause |
credential |
Identifies the credential object used to access data files in an object store. (optional) See access parameter com.oracle.bigdata.credential.name |
path |
Specifies the path to the nested objects that the stage returns. (optional) See com.oracle.bigdata.json.path Access Parameter |
The following example shows how to use $external to access protected files in object storage with Autonomous Database.
db.aggregate([{ $sql:
`BEGIN
DBMS_CLOUD.CREATE_CREDENTIAL(
credential_name => 'MYCRED',
username => 'myuser@oracle.com',
password => 'XXXXXXXXX'
);
END;`
}]);
db.aggregate([
{
$external: {
location:"https://objectstorage.ca-toronto-1.oraclecloud.com/n/yz6dzkrqow85/b/private-demo/o/movies.json",
credential:"MYCRED"
}
},
{$limit:1}
])
[
{
studio: null,
title: "'Gator Bait II: Cajun Justice",
summary: "' Gator Bait II: Cajun Justice is a 1988 sequel to the 1974 film 'Gator Bait , written, produced and directed by Beverly Sebastian and Ferd Sebastian. Largely ignored upon release, the film received a second life on cable television and home video.",
sku: 'COO3790',
list_price: 3.99,
year: 1988,
}
]
The example first creates a credential for object storage by using DBMS_CLOUD.CREATE_CREDENTIAL and then uses the credential to access a JSON file in object storage. For more information, see How to Create a Credential for Object Stores.
The next example shows how $external can access a JSON file within a directory.
db.aggregate([{$sql:"CREATE DIRECTORY DEMODIR AS '/tmp/demo'"}]);
[ { result: 0 } ]
db.aggregate([
{
$external: {
location:"movie.json",
directory:"DEMODIR"
}
},
{
$limit:1
}
]);
[
{
studio: null,
title: "'Gator Bait II: Cajun Justice",
summary: "' Gator Bait II: Cajun Justice is a 1988 sequel to the 1974 film 'Gator Bait , written, produced and directed by Beverly Sebastian and Ferd Sebastian. Largely ignored upon release, the film received a second life on cable television and home video.",
sku: 'COO3790',
list_price: 3.99,
year: 1988
}
]
This example creates a directory named DEMODIR that points to /tmp/demo and then accesses the file movie.json in that directory.