Skip Headers

Oracle9i Streams
Release 2 (9.2)

Part Number A96571-02
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page Go to next page
View PDF

19
Streams Messaging Example

This chapter illustrates a messaging environment that can be constructed using Streams.

This chapter contains these topics:

Overview of Messaging Example

This example illustrates using a single SYS.AnyData queue at a database called oedb.net to create a Streams messaging environment in which events containing message payloads of different types are stored in the same queue. Specifically, this example illustrates the following messaging features of Streams:

Figure 19-1 provides an overview of this environment.

Figure 19-1 Example Streams Messaging Environment

Text description of strms021.gif follows
Text description of the illustration strms021.gif


Prerequisites

The following are prerequisites that must be completed before you begin the example in this section.

Set Up Users and Create a Streams Queue

Complete the following steps to set up users and create a Streams queue for a Streams messaging environment.

  1. Show Output and Spool Results
  2. Set Up Users
  3. Create the Streams Queue
  4. Check the Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database.


/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_setup_message.out

/*
Step 2 Set Up Users

Connect to oedb.net as SYS user.

*/
 
CONNECT SYS/CHANGE_ON_INSTALL@oedb.net AS SYSDBA

/*

This example uses the oe sample schema. For this example to work properly, the oe user must have privileges to execute the subprograms in the DBMS_AQ package. The oe user will be specified as the queue user when the Streams queue is created in Step 3. The SET_UP_QUEUE procedure will grant the oe user ENQUEUE and DEQUEUE privileges on the queue, but the oe user also needs EXECUTE privilege on the DBMS_AQ package to enqueue events into and dequeue events from the queue.

Also, most of the configuration and administration actions illustrated in this example are performed by the Streams administrator. In this step, create the Streams administrator named strmadmin and grant this user the necessary privileges. These privileges enable the user to execute subprograms in packages related to Streams, create rule sets, create rules, and monitor the Streams environment by querying data dictionary views. You may choose a different name for this user.


Note:
  • To ensure security, use a password other than strmadminpw for the Streams administrator.
  • The SELECT_CATALOG_ROLE is not required for the Streams administrator. It is granted in this example so that the Streams administrator can monitor the environment easily.
  • If you plan to use the Streams tool in Oracle Enterprise Manager, then grant the Streams administrator SELECT ANY DICTIONARY privilege, in addition to the privileges shown in this step.
  • The ACCEPT command must appear on a single line in the script.

*/

GRANT EXECUTE ON DBMS_AQ TO oe;

GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE 
  TO strmadmin IDENTIFIED BY strmadminpw;

ACCEPT streams_tbs PROMPT 'Enter the tablespace for the Streams administrator: '

ALTER USER strmadmin DEFAULT TABLESPACE &streams_tbs
                     QUOTA UNLIMITED ON &streams_tbs;

GRANT EXECUTE ON DBMS_APPLY_ADM   TO strmadmin;
GRANT EXECUTE ON DBMS_AQ          TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM       TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

/*
Step 3 Create the Streams Queue

Connect as the Streams administrator.

*/

CONNECT strmadmin/strmadminpw@oedb.net

/*

Run the SET_UP_QUEUE procedure to create a queue named oe_queue at oedb.net. This queue will function as the Streams queue by holding events used in the messaging environment.

Running the SET_UP_QUEUE procedure performs the following actions:

*/

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE( 
    queue_table => 'oe_queue_table', 
    queue_name  => 'oe_queue');
END;
/

/*
Step 4 Grant the oe User Privileges on the Queue
*/

BEGIN
  SYS.DBMS_AQADM.GRANT_QUEUE_PRIVILEGE(
    privilege  => 'ALL',
    queue_name => 'strmadmin.oe_queue',
    grantee    => 'oe');
END;
/

/*
Step 5 Create an Agent for Explicit Enqueue

Create an agent that will be used to perform explicit enqueue operations on the oe_queue queue.

*/

BEGIN
  SYS.DBMS_AQADM.CREATE_AQ_AGENT(
    agent_name  => 'explicit_enq');
END;
/

/*
Step 6 Associate the oe User with the explicit_enq Agent

For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. The oe_queue queue is a secure queue because it was created using SET_UP_QUEUE. This step enables the oe user to perform enqueue operations on this queue.

*/

BEGIN
  DBMS_AQADM.ENABLE_DB_ACCESS(
    agent_name  => 'explicit_enq',
    db_username => 'oe');
END;
/

/* 
Step 7 Check the Spool Results

Check the streams_setup_message.out spool file to ensure that all actions completed successfully after this script completes.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Create the Enqueue Procedures

Complete the following steps to create one PL/SQL procedure that enqueues non-LCR events into the Streams queue and one PL/SQL procedure that enqueues row LCR events into the Streams queue.

  1. Show Output and Spool Results
  2. Create a Type to Represent Orders
  3. Create a Type to Represent Customers
  4. Create the Procedure to Enqueue Non-LCR Events
  5. Create a Procedure to Construct and Enqueue Row LCR Events
  6. Check the Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database.


/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_enqprocs_message.out

/*
Step 2 Create a Type to Represent Orders

Connect as oe.

*/

CONNECT oe/oe@oedb.net

/*

Create a type to represent orders based on the columns in the oe.orders table. The type attributes include the columns in the oe.orders table, along with one extra attribute named action. The value of the action attribute for instances of this type will be used to determine correct action to perform on the instance (either apply process dequeue or explicit dequeue). This type will be used for events that will be enqueued into the Streams queue.

*/

CREATE OR REPLACE TYPE order_event_typ AS OBJECT (
  order_id       NUMBER(12),
  order_date     TIMESTAMP(6) WITH LOCAL TIME ZONE,
  order_mode     VARCHAR2(8),
  customer_id    NUMBER(6),
  order_status   NUMBER(2),
  order_total    NUMBER(8,2),
  sales_rep_id   NUMBER(6),
  promotion_id   NUMBER(6),
  action         VARCHAR(7));
/

/*
Step 3 Create a Type to Represent Customers

Create a type to represent customers based on the columns in the oe.customers table. The type attributes include the columns in the oe.customers table, along with one extra attribute named action. The value of the action attribute for instances of this type will be used to determine correct action to perform on the instance (either apply process dequeue or explicit dequeue). This type will be used for events that will be enqueued into the Streams queue.

*/

CREATE OR REPLACE TYPE customer_event_typ AS OBJECT (
  customer_id         NUMBER(6),
  cust_first_name     VARCHAR2(20),
  cust_last_name      VARCHAR2(20),
  cust_address        CUST_ADDRESS_TYP,
  phone_numbers       PHONE_LIST_TYP,
  nls_language        VARCHAR2(3),
  nls_territory       VARCHAR2(30),
  credit_limit        NUMBER(9,2),
  cust_email          VARCHAR2(30),
  account_mgr_id      NUMBER(6),
  cust_geo_location   MDSYS.SDO_GEOMETRY,
  action              VARCHAR(7));
/

/*
Step 4 Create the Procedure to Enqueue Non-LCR Events

Create a PL/SQL procedure called enq_proc to enqueue events into the Streams queue.


Note:

A single enqueued message can be dequeued by an apply process and by an explicit dequeue, but this example does not illustrate this capability.


*/

CREATE OR REPLACE PROCEDURE oe.enq_proc (event IN SYS.Anydata) IS
    enqopt       DBMS_AQ.ENQUEUE_OPTIONS_T;
    mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
    enq_eventid  RAW(16);
  BEGIN
    mprop.SENDER_ID := SYS.AQ$_AGENT(
      'explicit_enq',
      NULL,
      NULL);
  DBMS_AQ.ENQUEUE(
    queue_name          => 'strmadmin.oe_queue',
    enqueue_options     => enqopt,
    message_properties  => mprop,
    payload             => event,
    msgid               => enq_eventid);
END;
/

/*
Step 5 Create a Procedure to Construct and Enqueue Row LCR Events

Create a procedure called enq_row_lcr that constructs a row LCR and then enqueues the row LCR into the queue.

See Also:

Oracle9i Supplied PL/SQL Packages and Types Reference for more information about LCR constructors

*/

CREATE OR REPLACE PROCEDURE oe.enq_row_lcr(
                 source_dbname  VARCHAR2,
                 cmd_type       VARCHAR2,
                 obj_owner      VARCHAR2,
                 obj_name       VARCHAR2,
                 old_vals       SYS.LCR$_ROW_LIST,
                 new_vals       SYS.LCR$_ROW_LIST) AS
  eopt           DBMS_AQ.ENQUEUE_OPTIONS_T;
  mprop          DBMS_AQ.MESSAGE_PROPERTIES_T;
  enq_msgid      RAW(16);
  row_lcr        SYS.LCR$_ROW_RECORD;
BEGIN
  mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL); 
  -- Construct the LCR based on information passed to procedure
  row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
    source_database_name  =>  source_dbname,
    command_type          =>  cmd_type,
    object_owner          =>  obj_owner,
    object_name           =>  obj_name,
    old_values            =>  old_vals,
    new_values            =>  new_vals);
  -- Enqueue the created row LCR
  DBMS_AQ.ENQUEUE(
    queue_name         =>  'strmadmin.oe_queue', 
    enqueue_options    =>  eopt,
    message_properties =>  mprop,
    payload            =>  SYS.AnyData.ConvertObject(row_lcr),
    msgid              =>  enq_msgid);
END enq_row_lcr;
/

/*
Step 6 Check the Spool Results

Check the streams_enqprocs_message.out spool file to ensure that all actions completed successfully after this script completes.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Configure an Apply Process

Complete the following steps to configure an apply process to apply the user-enqueued events in the Streams queue.

  1. Show Output and Spool Results
  2. Create a Function to Determine the Value of the action Attribute
  3. Create a Message Handler
  4. Grant strmadmin EXECUTE Privilege on the Procedures
  5. Create the Evaluation Context for the Rule Set
  6. Create a Rule Set for the Apply Process
  7. Create a Rule that Evaluates to TRUE if the Event Action Is apply
  8. Create a Rule that Evaluates to TRUE for the Row LCR Events
  9. Add the Rules to the Rule Set
  10. Create an Apply Process
  11. Grant EXECUTE Privilege on the Rule Set To oe User
  12. Start the Apply Process
  13. Check the Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database.


/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_apply_message.out

/*
Step 2 Create a Function to Determine the Value of the action Attribute

Connect as oe.

*/

CONNECT oe/oe@oedb.net

/*

Create a function called get_oe_action to determine the value of the action attribute in the events in the queue. This function is used in rules later in this example to determine the value of the action attribute for an event. Then, the clients of the rules engine perform the appropriate action for the event (either dequeue by apply process or explicit dequeue). In this example, the clients of the rules engine are the apply process and the oe.explicit_dq PL/SQL procedure.

*/

CREATE OR REPLACE FUNCTION oe.get_oe_action (event IN SYS.Anydata) 
RETURN VARCHAR2
IS 
  ord         oe.order_event_typ;
  cust        oe.customer_event_typ;
  num           NUMBER;
  type_name     VARCHAR2(61);
BEGIN
  type_name := event.GETTYPENAME; 
  IF type_name = 'OE.ORDER_EVENT_TYP' THEN
    num := event.GETOBJECT(ord);
    RETURN ord.action;  
  ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN
    num := event.GETOBJECT(cust);
    RETURN cust.action; 
  ELSE
    RETURN NULL;
  END IF;
END;
/

/*
Step 3 Create a Message Handler

Create a message handler called mes_handler that will be used as a message handler by the apply process. This procedure takes the payload in a user-enqueued event of type oe.order_event_typ or oe.customer_event_typ and inserts it as a row in the oe.orders table and oe.customers table, respectively.

*/

CREATE OR REPLACE PROCEDURE oe.mes_handler (event SYS.AnyData) 
IS
  ord           oe.order_event_typ;
  cust          oe.customer_event_typ;
  num           NUMBER;
  type_name     VARCHAR2(61);
BEGIN
  type_name := event.GETTYPENAME;
  IF type_name = 'OE.ORDER_EVENT_TYP' THEN
    num := event.GETOBJECT(ord);
    INSERT INTO oe.orders VALUES (ord.order_id, ord.order_date, 
      ord.order_mode, ord.customer_id, ord.order_status, ord.order_total, 
      ord.sales_rep_id, ord.promotion_id); 
  ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN
    num := event.GETOBJECT(cust);
    INSERT INTO oe.customers VALUES (cust.customer_id, cust.cust_first_name, 
      cust.cust_last_name, cust.cust_address, cust.phone_numbers, 
      cust.nls_language, cust.nls_territory, cust.credit_limit, cust.cust_email, 
      cust.account_mgr_id, cust.cust_geo_location); 
  END IF;
END;
/

/*
Step 4 Grant strmadmin EXECUTE Privilege on the Procedures
*/

GRANT EXECUTE ON get_oe_action TO strmadmin;

GRANT EXECUTE ON mes_handler TO strmadmin;

/*
Step 5 Create the Evaluation Context for the Rule Set

Connect as the Streams administrator.

*/

CONNECT strmadmin/strmadminpw@oedb.net

/*

Create the evaluation context for the rule set. The table alias is tab in this example, but you can use a different table alias name if you wish.

*/

DECLARE
    table_alias     SYS.RE$TABLE_ALIAS_LIST;
  BEGIN
    table_alias := SYS.RE$TABLE_ALIAS_LIST(SYS.RE$TABLE_ALIAS(
                                            'tab', 
                                            'strmadmin.oe_queue_table'));
    DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT(
      evaluation_context_name  =>  'oe_eval_context', 
      table_aliases            =>  table_alias);
END;
/

/*
Step 6 Create a Rule Set for the Apply Process

Create the rule set for the apply process.

*/

BEGIN
  DBMS_RULE_ADM.CREATE_RULE_SET(
    rule_set_name       =>  'apply_oe_rs',
    evaluation_context  =>  'strmadmin.oe_eval_context');
END;
/

/*
Step 7 Create a Rule that Evaluates to TRUE if the Event Action Is apply

Create a rule that evaluates to TRUE if the action value of an event is apply. Notice that tab.user_data is passed to the oe.get_oe_action function. The tab.user_data column holds the event payload in a queue table. The table alias for the queue table was specified as tab in Step 5.

*/

BEGIN
  DBMS_RULE_ADM.CREATE_RULE(
    rule_name   => 'strmadmin.apply_action',
    condition   => ' oe.get_oe_action(tab.user_data) = ''APPLY'' ');
END;
/

/*
Step 8 Create a Rule that Evaluates to TRUE for the Row LCR Events

Create a rule that evaluates to TRUE if the event in the queue is a row LCR that changes either the oe.orders table or the oe.customers table. This rule will enable the apply process to apply user-enqueued changes to the tables directly. For convenience, this rule uses the Oracle-supplied evaluation context SYS.STREAMS$_EVALUATION_CONTEXT because the rule is used to evaluate LCRs. When this rule is added to the rule set, this evaluation context is used for the rule during evaluation instead of the rule set's evaluation context.

*/

BEGIN
  DBMS_RULE_ADM.CREATE_RULE(
    rule_name           =>  'apply_lcrs',
    condition           =>  ':dml.GET_OBJECT_OWNER() = ''OE'' AND ' || 
                            ' (:dml.GET_OBJECT_NAME() = ''ORDERS'' OR ' || 
                            ':dml.GET_OBJECT_NAME() = ''CUSTOMERS'') ',
    evaluation_context  =>  'SYS.STREAMS$_EVALUATION_CONTEXT');
END;
/

/*
Step 9 Add the Rules to the Rule Set

Add the rules created in Step 7 and Step 8 to the rule set created in Step 6.

*/

BEGIN
  DBMS_RULE_ADM.ADD_RULE(
    rule_name          =>  'apply_action',
    rule_set_name      =>  'apply_oe_rs');
  DBMS_RULE_ADM.ADD_RULE(
    rule_name          =>  'apply_lcrs',
    rule_set_name      =>  'apply_oe_rs');
END;
/

/*
Step 10 Create an Apply Process

Create an apply process that is associated with the oe_queue, that uses the apply_oe_rs rule set, and that uses the mes_handler procedure as a message handler.

*/

BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    queue_name       =>  'strmadmin.oe_queue',
    apply_name       =>  'apply_oe',
    rule_set_name    =>  'strmadmin.apply_oe_rs',
    message_handler  =>  'oe.mes_handler',
    apply_user       =>  'oe',
    apply_captured   =>  false);
END;
/

/*
Step 11 Grant EXECUTE Privilege on the Rule Set To oe User

Grant EXECUTE privilege on the strmadmin.apply_oe_rs rule set. Because oe was specified as the apply user when the apply process was created in Step 10, oe needs execute privilege on the rule set used by the apply process.

*/

BEGIN 
  DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.EXECUTE_ON_RULE_SET,
    object_name  => 'strmadmin.apply_oe_rs',
    grantee      => 'oe', 
    grant_option => FALSE);
END;
/

/*
Step 12 Start the Apply Process

Set the disable_on_error parameter to n so that the apply process is not disabled if it encounters an error, and start the apply process at oedb.net.

*/

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name  => 'apply_oe', 
    parameter   => 'disable_on_error', 
    value       => 'n');
END;
/

BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    apply_name  =>  'apply_oe');
END;
/

/*
Step 13 Check the Spool Results

Check the streams_apply_message.out spool file to ensure that all actions completed successfully after this script completes.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Configure Explicit Dequeue

Complete the following steps to configure explicit dequeue of messages based on message contents.

  1. Show Output and Spool Results
  2. Create an Agent for Explicit Dequeue
  3. Associate the oe User with the explicit_dq Agent
  4. Add a Subscriber to the oe_queue Queue
  5. Create a Procedure to Dequeue Events Explicitly
  6. Check Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database.


/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_explicit_dq.out

/*
Step 2 Create an Agent for Explicit Dequeue

Connect as the Streams administrator.

*/

CONNECT strmadmin/strmadminpw@oedb.net

/*

Create an agent that will be used to perform explicit dequeue operations on the oe_queue queue.

*/

BEGIN
  SYS.DBMS_AQADM.CREATE_AQ_AGENT(
    agent_name  => 'explicit_dq');
END;
/

/*
Step 3 Associate the oe User with the explicit_dq Agent

For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. The oe_queue queue is a secure queue because it was created using SET_UP_QUEUE. The oe user will be able to perform dequeue operations on this queue when the agent is used to create a subscriber to the queue in the next step.

*/

BEGIN
  DBMS_AQADM.ENABLE_DB_ACCESS(
    agent_name  => 'explicit_dq',
    db_username => 'oe');
END;
/

/*  
Step 4 Add a Subscriber to the oe_queue Queue

Add a subscriber to the oe_queue queue. This subscriber will perform explicit dequeues of events. A subscriber rule is used to dequeue any events where the action value is not apply. If the action value is apply for an event, then the event is ignored by the subscriber. Such events are dequeued and processed by the apply process.

*/

DECLARE
  subscriber SYS.AQ$_AGENT;
BEGIN
  subscriber :=  SYS.AQ$_AGENT('explicit_dq', NULL, NULL);  
  SYS.DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name  =>  'strmadmin.oe_queue',
    subscriber  =>  subscriber,
    rule        =>  'oe.get_oe_action(tab.user_data) != ''APPLY''');
END;
/

/*
Step 5 Create a Procedure to Dequeue Events Explicitly

Connect as oe.

*/

CONNECT oe/oe@oedb.net

/*

Create a PL/SQL procedure called explicit_dq to dequeue events explicitly using the subscriber created in Step 4.


Note:
  • This procedure commits after the dequeue of the events. The commit informs the queue that the dequeued messages have been consumed successfully by this subscriber.
  • This procedure can process multiple transactions and uses two exception handlers. The first exception handler next_trans moves to the next transaction while the second exception handler no_messages exits the loop when there are no more messages.

*/

CREATE OR REPLACE PROCEDURE oe.explicit_dq (consumer IN VARCHAR2) AS
  deqopt       DBMS_AQ.DEQUEUE_OPTIONS_T;
  mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
  msgid        RAW(16);
  payload      SYS.AnyData;
  new_messages BOOLEAN := TRUE;
  ord          oe.order_event_typ;
  cust         oe.customer_event_typ;
  tc           pls_integer;
  next_trans   EXCEPTION;
  no_messages  EXCEPTION; 
  pragma exception_init (next_trans, -25235);
  pragma exception_init (no_messages, -25228);
BEGIN
  deqopt.consumer_name := consumer;
  deqopt.wait := 1;
  WHILE (new_messages) LOOP  
    BEGIN
    DBMS_AQ.DEQUEUE(
      queue_name          =>  'strmadmin.oe_queue',
      dequeue_options     =>  deqopt,
      message_properties  =>  mprop,
      payload             =>  payload,
      msgid               =>  msgid);
    COMMIT;
    deqopt.navigation := DBMS_AQ.NEXT;
    DBMS_OUTPUT.PUT_LINE('Event Dequeued');
    DBMS_OUTPUT.PUT_LINE('Type Name := ' || payload.GetTypeName);
    IF (payload.GetTypeName = 'OE.ORDER_EVENT_TYP') THEN
      tc := payload.GetObject(ord);           
      DBMS_OUTPUT.PUT_LINE('order_id     - ' || ord.order_id);
      DBMS_OUTPUT.PUT_LINE('order_date   - ' || ord.order_date);
      DBMS_OUTPUT.PUT_LINE('order_mode   - ' || ord.order_mode);
      DBMS_OUTPUT.PUT_LINE('customer_id  - ' || ord.customer_id);
      DBMS_OUTPUT.PUT_LINE('order_status - ' || ord.order_status);
      DBMS_OUTPUT.PUT_LINE('order_total  - ' || ord.order_total);
      DBMS_OUTPUT.PUT_LINE('sales_rep_id - ' || ord.sales_rep_id);
      DBMS_OUTPUT.PUT_LINE('promotion_id - ' || ord.promotion_id);
    END IF;                       
    IF (payload.GetTypeName = 'OE.CUSTOMER_EVENT_TYP') THEN
      tc := payload.GetObject(cust);     
      DBMS_OUTPUT.PUT_LINE('customer_id     - ' || cust.customer_id);
      DBMS_OUTPUT.PUT_LINE('cust_first_name - ' || cust.cust_first_name);
      DBMS_OUTPUT.PUT_LINE('cust_last_name  - ' || cust.cust_last_name);
      DBMS_OUTPUT.PUT_LINE('street_address  - ' || 
                              cust.cust_address.street_address);
      DBMS_OUTPUT.PUT_LINE('postal_code     - ' || 
                              cust.cust_address.postal_code);
      DBMS_OUTPUT.PUT_LINE('city            - ' || cust.cust_address.city);
      DBMS_OUTPUT.PUT_LINE('state_province  - ' || 
                              cust.cust_address.state_province);
      DBMS_OUTPUT.PUT_LINE('country_id      - ' || 
                              cust.cust_address.country_id);
      DBMS_OUTPUT.PUT_LINE('phone_number1   - ' || cust.phone_numbers(1));
      DBMS_OUTPUT.PUT_LINE('phone_number2   - ' || cust.phone_numbers(2));
      DBMS_OUTPUT.PUT_LINE('phone_number3   - ' || cust.phone_numbers(3));
      DBMS_OUTPUT.PUT_LINE('nls_language    - ' || cust.nls_language);
      DBMS_OUTPUT.PUT_LINE('nls_territory   - ' || cust.nls_territory);
      DBMS_OUTPUT.PUT_LINE('credit_limit    - ' || cust.credit_limit);
      DBMS_OUTPUT.PUT_LINE('cust_email      - ' || cust.cust_email);
      DBMS_OUTPUT.PUT_LINE('account_mgr_id  - ' || cust.account_mgr_id);
    END IF;
    EXCEPTION
      WHEN next_trans THEN
      deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
      WHEN no_messages THEN
        new_messages  := FALSE;
        DBMS_OUTPUT.PUT_LINE('No more events');
     END;
  END LOOP; 
END;
/

/*
Step 6 Check Spool Results

Check the streams_explicit_dq.out spool file to ensure that all actions completed successfully after this script completes.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Enqueue Events

Complete the following steps to enqueue non-LCR events and row LCR events into the queue.

  1. Show Output and Spool Results
  2. Enqueue Non-LCR Events to be Dequeued by the Apply Process
  3. Enqueue Non-LCR Events to be Dequeued Explicitly
  4. Enqueue Row LCR Events to be Dequeued by the Apply Process
  5. Check Spool Results

Note:
  • It is possible to dequeue user-enqueued LCRs explicitly, but this example does not illustrate this capability.
  • If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database.

/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_enq_deq.out

/*
Step 2 Enqueue Non-LCR Events to be Dequeued by the Apply Process

Connect as oe.

*/

CONNECT oe/oe@oedb.net

/*

Enqueue events with apply for the action value. Based on the apply process rules, the apply process will dequeue and process these events with the oe.mes_handler message handler procedure created in "Create a Message Handler". The COMMIT after the enqueues makes these two enqueues part of the same transaction. An enqueued message is not visible until the session that enqueued it commits the enqueue.

*/

BEGIN
  oe.enq_proc(SYS.AnyData.convertobject(oe.order_event_typ(
    2500,'05-MAY-01','online',117,3,44699,161,NULL,'APPLY')));
END;
/

BEGIN
  oe.enq_proc(SYS.AnyData.convertobject(oe.customer_event_typ(
    990,'Hester','Prynne',oe.cust_address_typ('555 Beacon Street','Boston', 
    'MA',02109,'US'),oe.phone_list_typ('+1 617 123 4104', '+1 617 083 4381', 
    '+1 617 742 5813'),'i','AMERICA',5000,'a@scarlet_letter.com',145,
    NULL,'APPLY')));
END;
/ 
 
COMMIT;

/*
Step 3 Enqueue Non-LCR Events to be Dequeued Explicitly

Enqueue events with dequeue for the action value. The oe.explicit_dq procedure created in "Create a Procedure to Dequeue Events Explicitly" will dequeue these events because the action is not apply. Based on the apply process rules, the apply process will ignore these events. The COMMIT after the enqueues makes these two enqueues part of the same transaction.

*/

BEGIN
  oe.enq_proc(SYS.AnyData.convertobject(oe.order_event_typ(
    2501,'22-JAN-00','direct',117,3,22788,161,NULL,'DEQUEUE')));
END;
/

BEGIN
  oe.enq_proc(SYS.AnyData.convertobject(oe.customer_event_typ(
    991,'Nick','Carraway',oe.cust_address_typ('10th Street',
    11101,'Long Island','NY','US'),oe.phone_list_typ('+1 718 786 2287', 
    '+1 718 511 9114', '+1 718 888 4832'),'i','AMERICA',3000,
    'nick@great_gatsby.com',149,NULL,'DEQUEUE')));
END;
/

COMMIT;

/*
Step 4 Enqueue Row LCR Events to be Dequeued by the Apply Process

Enqueue row LCR events. The apply process will apply these events directly. Enqueued LCRs should commit at transaction boundaries. In this step, a COMMIT statement is run after each enqueue, making each enqueue a separate transaction. However, you can perform multiple LCR enqueues before a commit if there is more than one LCR in a transaction.

Create a row LCR that inserts a row into the oe.orders table.

*/

DECLARE
  newunit1  SYS.LCR$_ROW_UNIT;
  newunit2  SYS.LCR$_ROW_UNIT;
  newunit3  SYS.LCR$_ROW_UNIT;
  newunit4  SYS.LCR$_ROW_UNIT;
  newunit5  SYS.LCR$_ROW_UNIT;
  newunit6  SYS.LCR$_ROW_UNIT;
  newunit7  SYS.LCR$_ROW_UNIT;
  newunit8  SYS.LCR$_ROW_UNIT;
  newvals   SYS.LCR$_ROW_LIST;
BEGIN
  newunit1 := SYS.LCR$_ROW_UNIT(
    'ORDER_ID', 
    SYS.AnyData.ConvertNumber(2502),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit2 := SYS.LCR$_ROW_UNIT(
    'ORDER_DATE', 
    SYS.AnyData.ConvertTimestampLTZ('04-NOV-00'),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit3 := SYS.LCR$_ROW_UNIT(
    'ORDER_MODE', 
    SYS.AnyData.ConvertVarchar2('online'),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit4 := SYS.LCR$_ROW_UNIT(
    'CUSTOMER_ID', 
    SYS.AnyData.ConvertNumber(145),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit5 := SYS.LCR$_ROW_UNIT(
    'ORDER_STATUS', 
    SYS.AnyData.ConvertNumber(3),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit6 := SYS.LCR$_ROW_UNIT(
    'ORDER_TOTAL', 
    SYS.AnyData.ConvertNumber(35199),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit7 := SYS.LCR$_ROW_UNIT(
    'SALES_REP_ID', 
    SYS.AnyData.ConvertNumber(160),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newunit8 := SYS.LCR$_ROW_UNIT(
    'PROMOTION_ID', 
    SYS.AnyData.ConvertNumber(1),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4,
                               newunit5,newunit6,newunit7,newunit8);
oe.enq_row_lcr(
  source_dbname  =>  'OEDB.NET',
  cmd_type       =>  'INSERT',
  obj_owner      =>  'OE',
  obj_name       =>  'ORDERS',
  old_vals       =>  NULL,
  new_vals       =>  newvals);
END;
/
COMMIT;

/*

Create a row LCR that updates the row inserted into the oe.orders table previously.

*/

DECLARE
  oldunit1  SYS.LCR$_ROW_UNIT;
  oldunit2  SYS.LCR$_ROW_UNIT;
  oldvals   SYS.LCR$_ROW_LIST;
  newunit1  SYS.LCR$_ROW_UNIT;
  newvals   SYS.LCR$_ROW_LIST;
BEGIN
  oldunit1 := SYS.LCR$_ROW_UNIT(
    'ORDER_ID', 
    SYS.AnyData.ConvertNumber(2502),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  oldunit2 := SYS.LCR$_ROW_UNIT(
    'ORDER_TOTAL', 
    SYS.AnyData.ConvertNumber(35199),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2);
  newunit1 := SYS.LCR$_ROW_UNIT(
    'ORDER_TOTAL', 
    SYS.AnyData.ConvertNumber(5235),
    DBMS_LCR.NOT_A_LOB,
    NULL,
    NULL);
  newvals := SYS.LCR$_ROW_LIST(newunit1);
oe.enq_row_lcr(
  source_dbname  =>  'OEDB.NET',
  cmd_type       =>  'UPDATE',
  obj_owner      =>  'OE',
  obj_name       =>  'ORDERS',
  old_vals       =>  oldvals,
  new_vals       =>  newvals);
END;
/
COMMIT;

/*
Step 5 Check Spool Results

Check the streams_enq_deq.out spool file to ensure that all actions completed successfully after this script completes.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Dequeue Events Explicitly and Query for Applied Events

Complete the following steps to dequeue the events explicitly and query the events that were applied by the apply process. These events were enqueued in the "Enqueue Events".

Step 1 Run the Procedure to Dequeue Events Explicitly

Run the procedure you created in "Create a Procedure to Dequeue Events Explicitly" and specify the consumer of the events you want to dequeue. In this case, the consumer is the subscriber you added in "Add a Subscriber to the oe_queue Queue". In this example, events that are not dequeued explicitly by this procedure are dequeued by the apply process.

CONNECT oe/oe@oedb.net

SET SERVEROUTPUT ON SIZE 100000

EXEC oe.explicit_dq('explicit_dq');

You should see the non-LCR events that were enqueued in "Enqueue Non-LCR Events to be Dequeued Explicitly".

Step 2 Query for Applied Events

Query the oe.orders and oe.customers table to see the rows corresponding to the events applied by the apply process:

SELECT * FROM oe.orders WHERE order_id = 2500;

SELECT cust_first_name, cust_last_name, cust_email 
  FROM oe.customers WHERE customer_id = 990;

SELECT * FROM oe.orders WHERE order_id = 2502;

You should see the non-LCR event that was enqueued in "Enqueue Non-LCR Events to be Dequeued by the Apply Process" and the row LCR events that were enqueued in "Enqueue Row LCR Events to be Dequeued by the Apply Process".

Enqueue and Dequeue Events Using JMS

This example enqueues non-LCR events and row LCR events into the queue using JMS. Then, this example dequeues these events from the queue using JMS.

Complete the following steps:

  1. Run the catxlcr.sql Script
  2. Create the Types for User Events
  3. Set the CLASSPATH
  4. Create Java Classes that Map to the Oracle Object Types
  5. Create a Java Code for Enqueuing Messages
  6. Create a Java Code for Dequeuing Messages
  7. Compile the Scripts
  8. Run the Enqueue Program
  9. Run the Dequeue Program
Step 1 Run the catxlcr.sql Script

For this example to complete successfully, the LCR schema must be loaded into the SYS schema using the catxlcr.sql script in Oracle home in the rdbms/admin/ directory. Run this script now if it has not been run already.

For example, if your Oracle home directory is /usr/oracle, then enter the following to run the script:

CONNECT SYS/CHANGE_ON_INSTALL AS SYSDBA

@/usr/oracle/rdbms/admin/catxlcr.sql
Step 2 Create the Types for User Events
CONNECT oe/oe

CREATE TYPE address AS OBJECT (street VARCHAR (30), num NUMBER)
/ 

CREATE TYPE person AS OBJECT (name VARCHAR (30), home ADDRESS)
/
Step 3 Set the CLASSPATH

The following jar and zip files should be in the CLASSPATH based on the release of JDK you are using.

Also, make sure LD_LIBRARY_PATH (Solaris) or PATH (Windows NT) has $ORACLE_HOME/lib set.

-- For JDK1.3.x 
$ORACLE_HOME/jdbc/lib/classes12.zip 
$ORACLE_HOME/rdbms/jlib/aqapi13.jar
$ORACLE_HOME/rdbms/jlib/jmscommon.jar
$ORACLE_HOME/rdbms/jlib/xdb.jar
$ORACLE_HOME/xdk/lib/xmlparserv2.jar
$ORACLE_HOME/jlib/jndi.jar

-- For JDK1.2.x 
$ORACLE_HOME/jdbc/lib/classes12.zip 
$ORACLE_HOME/rdbms/jlib/aqapi12.jar
$ORACLE_HOME/rdbms/jlib/jmscommon.jar
$ORACLE_HOME/rdbms/jlib/xdb.jar
$ORACLE_HOME/xdk/lib/xmlparserv2.jar
$ORACLE_HOME/jlib/jndi.jar

-- For JDK1.1.x 
$ORACLE_HOME/jdbc/lib/classes111.zip 
$ORACLE_HOME/rdbms/jlib/aqapi11.jar
$ORACLE_HOME/rdbms/jlib/jmscommon.jar
$ORACLE_HOME/rdbms/jlib/xdb.jar
$ORACLE_HOME/xdk/lib/xmlparserv2.jar
$ORACLE_HOME/jlib/jndi.jar

Step 4 Create Java Classes that Map to the Oracle Object Types

First, create a file input.typ with the following lines:

SQL PERSON AS JPerson
SQL ADDRESS AS JAddress

Then, run Jpublisher.

jpub -input=input.typ -user=OE/OE

Completing these actions generates two Java classes named JPerson and JAddress for the person and address types, respectively.

Step 5 Create a Java Code for Enqueuing Messages

This program uses the Oracle JMS API to publish messages into a Streams topic.

This program does the following:

Step 6 Create a Java Code for Dequeuing Messages

This program uses Oracle JMS API to receive messages from a Streams topic.

This program does the following:

Step 7 Compile the Scripts
javac StreamsEnq.java StreamsDeq.java JPerson.java JAddress.java
Step 8 Run the Enqueue Program
java StreamsEnq  ORACLE_SID HOST PORT

For example, if your Oracle SID is orc182, your host is hq_server, and your port is 1521, then enter the following:

java StreamsEnq orcl82 hq_server 1521 
Step 9 Run the Dequeue Program
java StreamsDeq  ORACLE_SID HOST PORT

For example, if your Oracle SID is orc182, your host is hq_server, and your port is 1520, then enter the following:

java StreamsDeq orcl82 hq_server 1521