Cache Messages with Singleton Pipes

Singleton Pipe is an addition to the DBMS_PIPE package that allows you to cache and retrieve a custom message and share the message across multiple database sessions with concurrent reads.

About Caching Messages with Singleton Pipes

The DBMS_PIPE package has extended functionality on Autonomous AI Database to support Singleton Pipes.

A Singleton Pipe in DBMS_PIPE:

About Standard Pipes and Singleton Pipes

The DBMS_PIPE Package allows two or more database sessions to communicate using in-memory messages. Pipe functionality has several applications such as external service interface, debugging, independent transactions, and alerts.

Description of database-pipe-messages-singleton-pipes.eps follows

Description of the illustration database-pipe-messages-singleton-pipes.png

A Singleton Pipe can be any one of the supported DBMS_PIPE types:

Singleton Pipes provide the ability to cache a single message in the memory of the Autonomous AI Database instance.

The following shows the general workflow for using singleton pipes.

Description of singleton-pipe-workflow.eps follows

Description of the illustration singleton-pipe-workflow.png

Singleton Pipe Overview and Features

Singleton Pipe Operations

Operation DBMS_PIPE Function or Procedure
Create an Explicit Singleton Pipe CREATE_PIPE Function
Cache a message in Singleton Pipe PACK_MESSAGE Procedures, SEND_MESSAGE Function
Read a cached message from Singleton Pipe RECEIVE_MESSAGE Function, UNPACK_MESSAGE Procedures
Delete a message in Singleton Pipe PURGE Procedure
Remove an Explicit Singleton Pipe REMOVE_PIPE Function

Automatic Refresh of Cached Message with a Cache Function

The DBMS_PIPE package allows you to automatically populate a Singleton Pipe message using a user-defined cache function.

By default, after a message is invalidated with either Singleton Pipe explicit or implicit invalidation, a subsequent DBMS_PIPE.RECEIVE_MESSAGE results in no message being received. To add a new message to the pipe, the message must be explicitly cached by calling DBMS_PIPE.SEND_MESSAGE. To avoid this case, where no message is available when you read from a Singleton Pipe, you can define a cache function. With a cache function defined, the cache function is automatically invoked when you receive a message in following scenarios:

To use a cache function define the cache function and include the cache_func parameter with DBMS_PIPE.RECEIVE_MESSAGE. A user-defined cache function provides the following:

Using a cache function simplifies working with Singleton Pipes. You do not need to handle failure cases for receiving a message from an empty pipe. In addition, a cache function ensures there is no cache-miss when you read messages from a Singleton Pipe, providing maximum use of the cached message.

Description of automatic-cache-refresh-cache-function.eps follows

Description of the illustration automatic-cache-refresh-cache-function.png

When you define a cache function, the function name must be fully qualified with the owner schema:

Define a cache function with the following signature:

CREATE OR REPLACE FUNCTION cache_function_name(
       pipename  IN VARCHAR2
) RETURN INTEGER;

The typical operations within a cache function are:

To use a cache function, the current session user that invokes DBMS_PIPE.RECEIVE_MESSAGE must have required privileges to execute the cache function.

See RECEIVE_MESSAGE Function more information on defining a cache function.

Create an Explicit Singleton Pipe

Describes the steps to create a Singleton Pipe with a specified pipe name (an Explicit Singleton Pipe).

First, for this example create the receive_message helper function to repeatedly call DBMS_PIPE.RECEIVE_MESSAGE. This allows you to test singleton pipe functionality.

CREATE OR REPLACE FUNCTION msg_types AS
       TYPE t_rcv_row IS RECORD (c1 VARCHAR2(32767), c2 NUMBER);
       TYPE t_rcv_tab IS TABLE OF t_rcv_row;
END;


CREATE OR REPLACE FUNCTION receive_message(
      pipename    IN VARCHAR2,
      rcv_count   IN NUMBER DEFAULT 1,
      cache_func  IN VARCHAR2 DEFAULT NULL)
   RETURN msg_types.t_rcv_tab pipelined
    AS
       l_msg    VARCHAR2(32767);
       l_status NUMBER;
 BEGIN
      FOR i IN
1..rcv_count LOOP
           l_status := DBMS_PIPE.RECEIVE_MESSAGE(
            pipename   => pipename,
            cache_func => cache_func,
            timeout    => 1);
         IF l_status != 0 THEN
              raise_application_error(-20000,
             'Message not received for attempt: ' || to_char(i) || ' status: ' ||
            l_status);
         END IF;

         DBMS_PIPE.UNPACK_MESSAGE(l_msg);
             pipe row(msg_types.t_rcv_row(l_msg));
     END LOOP;
 RETURN;
 END;
  1. Create an explicit singleton pipe named PIPE_TEST with shelflife parameter set to 3600 (seconds).
    DECLARE
      l_status INTEGER;
    BEGIN
      l_status := DBMS_PIPE.CREATE_PIPE(
                  pipename => 'MY_PIPE1',
                  private => TRUE,
                  singleton => TRUE,
                  shelflife => 3600);
    END;
    /

    See CREATE_PIPE Function for more information.

  2. Verify the singleton pipe is created.

    SELECT name, singleton, type
         FROM v$db_pipes WHERE name= '&pipename' ORDER BY 1;
    
    NAME                 SINGLETON  TYPE
    
    -------------------- ---------- -------
    PIPE_TEST            YES        PRIVATE
  3. Pack and send a message on the singleton pipe.

    EXEC DBMS_PIPE.PACK_MESSAGE('This is a real message that you can get multiple times');
    
    SELECT DBMS_PIPE.SEND_MESSAGE(pipename => '&pipename') status FROM DUAL;
    
    STATUS
    
    ----------
    0

    See PACK_MESSAGE Procedures and SEND_MESSAGE Function for more information.

  4. Receive a message from a singleton pipe.

    SELECT * FROM receive_message(
        pipename => '&pipename',
        rcv_count => 2);
    
    MESSAGE
    
    --------------------------------------------------------------------------------
    This is a real message that you can get multiple times
    This is a real message that you can get multiple times

    The receive_message function is a helper function that calls DBMS_PIPE.RECEIVE_MESSAGE.

  5. Purge the message and remove the pipe.

    EXEC DBMS_PIPE.PURGE('&pipename');
    SELECT DBMS_PIPE.REMOVE_PIPE('&pipename') status FROM DUAL;

Create an Explicit Singleton Pipe with a Cache Function

Describes the steps to create a Singleton Pipe with a specified pipe name, an Explicit Singleton Pipe, and provide a cache function. A cache function allows you to automatically populate the message in a singleton pipe.

  1. Create a cache function, test_cache_message for a singleton pipe.

    CREATE OR REPLACE FUNCTION test_cache_message(
         pipename IN VARCHAR2) return NUMBER
    
    AS
       l_status NUMBER;
       l_data VARCHAR2(4000);
    BEGIN
       l_status := DBMS_PIPE.CREATE_PIPE(
              pipename => pipename,
              private => TRUE,
              singleton => true,
              shelflife => 600);
       IF l_status != 0 THEN RETURN l_status;
       END IF;
    
       DBMS_PIPE.PACK_MESSAGE('This is a placeholder cache message for an empty pipe');
       l_status := DBMS_PIPE.SEND_MESSAGE(pipename => pipename);
       RETURN l_status;
     END;
    /

    Note: The current session user invoking DBMS_PIPE.RECEIVE_MESSAGE must have required privilege to execute the cache function.

  2. Receive with a cache function and confirm the message populates in pipe. The pipe must exist as a private pipe created in the cache function.

    SELECT * FROM receive_message(
         pipename => '&pipename',
         rcv_count => 1,
         cache_func => 'TEST_CACHE_MESSAGE');
    
    MESSAGE
    
    ---------------
    This is a placeholder cache message for an empty pipe

    The receive_message function is a helper function that calls DBMS_PIPE.RECEIVE_MESSAGE. See Create an Explicit Singleton Pipe for the receive_message definition.

    See CREATE_PIPE Function for more information.

  3. Receive without the cache function to confirm the message persists in the pipe.

    SELECT * FROM receive_message(
         pipename => '&pipename',
         rcv_count => 2);
    
    MESSAGE
    
    ---------------
    This is a placeholder cache message for an empty pipe
    This is a placeholder cache message for an empty pipe

    The receive_message function is a helper function that calls DBMS_PIPE.RECEIVE_MESSAGE. See Create an Explicit Singleton Pipe for the receive_message definition.

    See CREATE_PIPE Function for more information.