Skip Headers

Oracle9i Streams
Release 2 (9.2)

Part Number A96571-02
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page Go to next page
View PDF

13
Managing Staging and Propagation

This chapter provides instructions for managing Streams queues, propagations, and messaging environments.

This chapter contains these topics:

Each task described in this section should be completed by a Streams administrator that has been granted the appropriate privileges, unless specified otherwise.

See Also:

Managing Streams Queues

A Streams queue stages events whose payloads are of SYS.AnyData type. Therefore, a Streams queue can stage an event with payload of nearly any type, if the payload is wrapped in a SYS.AnyData wrapper. Each Streams capture process and apply process is associated with one Streams queue, and each Streams propagation is associated with one Streams source queue and one Streams destination queue.

This section provides instructions for completing the following tasks related to Streams queues:

Creating a Streams Queue

You use the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package to create a Streams queue. This procedure enables you to specify the following for the Streams queue it creates:

This procedure creates a queue that is both a secure queue and a transactional queue and starts the newly created queue.

For example, to create a Streams queue named strm01_queue with a queue table named strm01_queue_table and grant the hr user the privileges necessary to enqueue events into and dequeue events from the queue, run the following procedure:

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'strm01_queue_table',
    queue_name   => 'strm01_queue',
    queue_user   => 'hr');
END;
/

You can also use procedures in the DBMS_AQADM package to create a SYS.AnyData queue.

See Also:

Enabling a User to Perform Operations on a Secure Queue

For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. If you use the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package to create the secure queue, then the queue owner and the user specified by the queue_user parameter are configured as secure users of the queue automatically. If you want to enable other users to perform operations on the queue, then you can configure these users in one of the following ways:

The following example illustrates associating a user with an agent manually. Suppose you want to enable the oe user to perform queue operations on the strm01_queue created in "Creating a Streams Queue". The following steps configure the oe user as a secure queue user of strm01_queue:

  1. Connect as an administrative user who can create agents and alter users.
  2. Create an agent:
    EXEC DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'strm01_queue_agent');
    
    
  3. If the user must be able to dequeue events from queue, then make the agent a subscriber of the secure queue:
    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('strm01_queue_agent', NULL, NULL);  
      SYS.DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name          =>  'strmadmin.strm01_queue',
        subscriber          =>  subscriber,
        rule                =>  NULL,
        transformation      =>  NULL);
    END;
    /
    
    
  4. Associate the user with the agent:
    BEGIN
      DBMS_AQADM.ENABLE_DB_ACCESS(
        agent_name  => 'strm01_queue_agent',
        db_username => 'oe');
    END;
    /
    
    
  5. Grant the user EXECUTE privilege on the DBMS_AQ package, if the user is not already granted this privilege.
    GRANT EXECUTE ON DBMS_AQ TO oe;
    
    

When these steps are complete, the oe user is a secure user of the strm01_queue queue and can perform operations on the queue. You still must grant the user specific privileges to perform queue operations, such as enqueue and dequeue privileges.

See Also:

Disabling a User from Performing Operations on a Secure Queue

You may want to disable a user from performing queue operations on a secure queue for the following reasons:

To disable a secure queue user, you can revoke ENQUEUE and DEQUEUE privilege on the queue from the user, or you can run the DISABLE_DB_ACCESS procedure in the DBMS_AQADM package. For example, suppose you want to disable the oe user from performing queue operations on the strm01_queue created in "Creating a Streams Queue".


Attention:

If an agent is used for multiple secure queues, then running DISABLE_DB_ACCESS for the agent prevents the user from performing operations on all of these queues.


  1. Run the following procedure to disable the oe user from performing queue operations on the secure queue strm01_queue:
    BEGIN
      DBMS_AQADM.DISABLE_DB_ACCESS(
        agent_name  => 'strm01_queue_agent',
        db_username => 'oe');
    END;
    /
    
    
  2. If the agent is no longer needed, you can drop the agent:
    BEGIN
      DBMS_AQADM.DROP_AQ_AGENT(
        agent_name  => 'strm01_queue_agent');
    END;
    /
    
    
  3. Revoke privileges on the queue from the user, if the user no longer needs these privileges.
    BEGIN
      DBMS_AQADM.REVOKE_QUEUE_PRIVILEGE (
       privilege   => 'ALL',
       queue_name  => 'strmadmin.strm01_queue',
       grantee     => 'oe');
    END;
    /
    
    See Also:

Dropping a Streams Queue

To drop an existing Streams queue, perform the same actions that you would to drop a typed queue. A Streams queue may be dropped in the following ways:

When you drop a Streams queue, all of the error transactions that were moved to the exception queue from the Streams queue are deleted automatically.

See Also:

Oracle9i Supplied PL/SQL Packages and Types Reference for more information about dropping queues

Managing Streams Propagations and Propagation Jobs

A propagation propagates events from a Streams source queue to a Streams destination queue. This section provides instructions for completing the following tasks:

In addition, you can use the features of Oracle Advanced Queuing (AQ) to manage Streams propagations.

See Also:

Oracle9i Application Developer's Guide - Advanced Queuing for more information about managing propagations with the features of AQ

Creating a Propagation

You can use any of the following procedures to create a propagation:

Each of the procedures in the DBMS_STREAMS_ADM package creates a propagation with the specified name if it does not already exist, creates a rule set for the propagation if the propagation does not have a rule set, and may add table, schema, or global rules to the rule set. The CREATE_PROPAGATION procedure creates a propagation, but does not create a rule set or rules for the propagation. All propagations are started automatically upon creation.

The following tasks must be completed before you create a propagation:

Example of Creating a Propagation Using DBMS_STREAMS_ADM

The following is an example that runs the ADD_TABLE_RULES procedure in the DBMS_STREAMS_ADM package to create a propagation:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.departments',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm01_queue',
    destination_queue_name  => 'strmadmin.strm02_queue@dbs2.net',
    include_dml             => true,
    include_ddl             => true,
    include_tagged_lcr      => false,
    source_database         => 'dbs1.net' );
END;
/

Running this procedure performs the following actions:

Example of Creating a Propagation Using DBMS_PROPAGATION_ADM

The following is an example that runs the CREATE_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to create a propagation:

BEGIN
  DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
    propagation_name   => 'strm02_propagation',
    source_queue       => 'strmadmin.strm03_queue',
    destination_queue  => 'strmadmin.strm04_queue',
    destination_dblink => 'dbs2.net',
    rule_set_name      => 'strmadmin.strm01_rule_set');
END;
/

Running this procedure performs the following actions:

Enabling a Propagation Job

By default, propagation jobs are enabled upon creation. If you disable a propagation job and want to enable it, then use the ENABLE_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, to enable a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link, run the following procedure:

BEGIN
  DBMS_AQADM.ENABLE_PROPAGATION_SCHEDULE(
    queue_name  => 'strmadmin.strm01_queue',
    destination => 'dbs2.net');
END;
/

Note:

Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Scheduling a Propagation Job

You can schedule a propagation job using the SCHEDULE_PROPAGATION procedure in the DBMS_AQADM package. If there is a problem with a propagation job, then unscheduling and scheduling the propagation job may correct the problem.

For example, the following procedure schedules a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link:

BEGIN
  DBMS_AQADM.SCHEDULE_PROPAGATION(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net'); 
END;
/

Note:

Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Altering the Schedule of a Propagation Job

You can alter the schedule of an existing propagation job using the ALTER_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, suppose you want to alter the schedule of a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link. The following procedure sets the propagation job to propagate events every 15 minutes (900 seconds), with each propagation lasting 300 seconds, and a 25 second wait before new events in a completely propagated queue are propagated.

BEGIN
  DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net',
   duration    => 300,
   next_time   => 'SYSDATE + 900/86400',
   latency     => 25); 
END;
/

Note:

Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Unscheduling a Propagation Job

You can unschedule a propagation job using the UNSCHEDULE_PROPAGATION procedure in the DBMS_AQADM package. If there is a problem with a propagation job, then unscheduling and scheduling the propagation job may correct the problem.

For example, the following procedure unschedules a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link:

BEGIN
  DBMS_AQADM.UNSCHEDULE_PROPAGATION(
   queue_name  => 'strmadmin.strm01_queue',
   destination => 'dbs2.net'); 
END;
/

Note:

Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.


See Also:

Specifying the Rule Set for a Propagation

You specify the rule set that you want to associate with a propagation using the rule_set_name parameter in the ALTER_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package. For example, the following procedure sets the rule set for a propagation named strm01_propagation to strm02_rule_set.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name  => 'strm01_propagation',
    rule_set_name     => 'strmadmin.strm02_rule_set');
END;
/
See Also:

Adding Rules to the Rule Set for a Propagation

You add rules to the rule set of a propagation, you can run one of the following procedures:

The following is an example that runs the ADD_TABLE_RULES procedure in the DBMS_STREAMS_ADM package to add rules to the rule set of a propagation named strm01_propagation:

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'hr.locations',
    streams_name            => 'strm01_propagation',
    source_queue_name       => 'strmadmin.strm01_queue',
    destination_queue_name  => 'strmadmin.strm02_queue@dbs2.net',
    include_dml             => true,
    include_ddl             => true,
    source_database         => 'dbs1.net' );
END;
/

Running this procedure performs the following actions:

Removing a Rule from the Rule Set for a Propagation

You specify that you want to remove a rule from the rule set for an existing propagation by running the REMOVE_RULE procedure in the DBMS_STREAMS_ADM package. For example, the following procedure removes a rule named DEPARTMENTS3 from the rule set of a propagation named strm01_propagation.

BEGIN
  DBMS_STREAMS_ADM.REMOVE_RULE(
    rule_name        => 'DEPARTMENTS3',
    streams_type     => 'propagation',
    streams_name     => 'strm01_propagation',
    drop_unused_rule => true);
END;
/

In this example, the drop_unused_rule parameter in the REMOVE_RULE procedure is set to true, which is the default setting. Therefore, if the rule being removed is not in any other rule set, then it will be dropped from the database. If the drop_unused_rule parameter is set to false, then the rule is removed from the rule set, but it is not dropped from the database.

In addition, if you want to remove all of the rules in the rule set for the propagation, then specify NULL for the rule_name parameter when you run the REMOVE_RULE procedure.


Note:

If you drop all of the rules in the rule set for a propagation, then the propagation propagations no events in the source queue to the destination queue.


Removing the Rule Set for a Propagation

You specify that you want to remove the rule set from a propagation by setting the rule_set_name parameter to NULL in the ALTER_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package. For example, the following procedure removes the rule set from a propagation named strm01_propagation.

BEGIN
  DBMS_PROPAGATION_ADM.ALTER_PROPAGATION(
    propagation_name => 'strm01_propagation',
    rule_set_name    => NULL);
END;
/

Note:

If you remove a rule set for a propagation, then the propagation propagates all events in the source queue to the destination queue.


Disabling a Propagation Job

To stop a propagation job, use the DISABLE_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.

For example, to stop a propagation job that propagates events from the strmadmin.strm01_queue source queue using the dbs2.net database link, run the following procedure:

BEGIN
  DBMS_AQADM.DISABLE_PROPAGATION_SCHEDULE(
    queue_name  => 'strmadmin.strm01_queue',
    destination => 'dbs2.net');
END;
/

Note:
  • Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the dbs2.net database link.
  • The DISABLE_PROPAGATION_SCHEDULE disables the propagation job immediately. It does not wait for the current duration to end.

See Also:

Oracle9i Application Developer's Guide - Advanced Queuing for more information about using the DISABLE_PROPAGATION_SCHEDULE procedure

Dropping a Propagation

You run the DROP_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to drop an existing propagation. For example, the following procedure drops a propagation named strm01_propagation:

BEGIN
  DBMS_PROPAGATION_ADM.DROP_PROPAGATION(
    propagation_name => 'strm01_propagation');
END;
/


Note:

When you drop a propagation, the propagation job used by the propagation is dropped automatically, if no other propagations are using the propagation job.


Managing a Streams Messaging Environment

Streams enables messaging with queues of type SYS.AnyData. These queues stage user messages whose payloads are of SYS.AnyData type, and a SYS.AnyData payload can be a wrapper for payloads of different datatypes.

This section provides instructions for completing the following tasks:

Wrapping User Message Payloads in a SYS.AnyData Wrapper

You can wrap almost any type of payload in a SYS.AnyData payload. The following sections provide examples of enqueuing messages into, and dequeuing messages from, a SYS.AnyData queue.

Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It

The following steps illustrate how to wrap payloads of various types in a SYS.AnyData payload.

  1. Connect as an administrative user who can create users, grant privileges, create tablespaces, and alter users at the dbs1.net database.
  2. Grant EXECUTE privilege on the DBMS_AQ package to the oe user so that this user can run the ENQUEUE and DEQUEUE procedures in that package:
    GRANT EXECUTE ON DBMS_AQ TO oe;
    
    
  3. Connect as the Streams administrator, as in the following example:
    CONNECT strmadmin/strmadminpw@dbs1.net
    
    
  4. Create a SYS.AnyData queue if one does not already exist.
    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe_q_table_any',
        queue_name   => 'oe_q_any',
        queue_user   => 'oe');
    END;
    /
    
    

    The oe user is configured automatically as a secure queue user of the oe_q_any queue and is given ENQUEUE and DEQUEUE privileges on the queue.

  5. Create an agent:
    EXEC DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'local_agent');
    
    
  6. Add a subscriber to the oe_q_any queue. This subscriber will perform explicit dequeues of events.
    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL);  
      SYS.DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name  =>  'strmadmin.oe_q_any',
        subscriber  =>  subscriber);
    END;
    /
    
    
  7. Associate the oe user with the local_agent agent:
    BEGIN
      DBMS_AQADM.ENABLE_DB_ACCESS(
        agent_name  => 'local_agent',
        db_username => 'oe');
    END;
    /
    
    
  8. Connect as the oe user.
    CONNECT oe/oe@dbs1.net
    
    
  9. Create a procedure that takes as an input parameter an object of SYS.AnyData type and enqueues a message containing the payload into an existing SYS.AnyData queue.
    CREATE OR REPLACE PROCEDURE oe.enq_proc (payload SYS.AnyData) 
    IS
      enqopt     DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop      DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid  RAW(16);
    BEGIN
      mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); 
      DBMS_AQ.ENQUEUE(
        queue_name          =>  'strmadmin.oe_q_any',
        enqueue_options     =>  enqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  enq_msgid);
    END;
    /
    
    
  10. Run the procedure you created in Step 9 by specifying the appropriate Convertdata_type function. The following commands enqueue messages of various types.

    VARCHAR2 type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertVarchar2('Chemicals - SW'));
    COMMIT;
    
    

    NUMBER type:

    EXEC oe.enq_proc(SYS.AnyData.ConvertNumber('16'));
    COMMIT;
    
    

    User-defined type:

    BEGIN
      oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ(
        '1646 Brazil Blvd','361168','Chennai','Tam', 'IN')));
    END;
    /
    COMMIT;
    
    See Also:

    "Viewing the Contents of User-Enqueued Events in a Queue" for information about viewing the contents of these enqueued messages

Example of Dequeuing a Payload That Is Wrapped in a SYS.AnyData Payload

The following steps illustrate how to dequeue a payload wrapped in a SYS.AnyData payload. This example assumes that you have completed the steps in "Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It".

To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$queue_table_name, where queue_table_name is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any queue, run the following query:

CONNECT strmadmin/strmadminpw@dbs1.net

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;

  1. Connect as the oe user:
    CONNECT oe/oe@dbs1.net
    
    
  2. Create a procedure that takes as an input the consumer of the messages you want to dequeue. The following example procedure dequeues messages of oe.cust_address_typ and prints the contents of the messages.
    CREATE OR REPLACE PROCEDURE oe.get_cust_address (
    consumer IN VARCHAR2) AS
      address         OE.CUST_ADDRESS_TYP;
      deq_address     SYS.AnyData; 
      msgid           RAW(16); 
      deqopt          DBMS_AQ.DEQUEUE_OPTIONS_T; 
      mprop           DBMS_AQ.MESSAGE_PROPERTIES_T;
      new_addresses   BOOLEAN := TRUE;
      next_trans      EXCEPTION;
      no_messages     EXCEPTION; 
      pragma exception_init (next_trans, -25235);
      pragma exception_init (no_messages, -25228);
      num_var         pls_integer;
    BEGIN
         deqopt.consumer_name := consumer;
         deqopt.wait := 1;
         WHILE (new_addresses) LOOP
         BEGIN
          DBMS_AQ.DEQUEUE( 
             queue_name          =>  'strmadmin.oe_q_any',
             dequeue_options     =>  deqopt,
             message_properties  =>  mprop,
             payload             =>  deq_address,
             msgid               =>  msgid);
    
              deqopt.navigation := DBMS_AQ.NEXT;
    
             DBMS_OUTPUT.PUT_LINE('****'); 
             IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
                 DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||  
                                       deq_address.GetTypeName()); 
                 num_var := deq_address.GetObject(address);    
                 DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
                 DBMS_OUTPUT.PUT_LINE(address.street_address);
                 DBMS_OUTPUT.PUT_LINE(address.postal_code);
                 DBMS_OUTPUT.PUT_LINE(address.city);
                 DBMS_OUTPUT.PUT_LINE(address.state_province);
                 DBMS_OUTPUT.PUT_LINE(address.country_id);
             ELSE
                DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||    
                                      deq_address.GetTypeName()); 
             END IF;  
           COMMIT;   
        EXCEPTION
          WHEN next_trans THEN
          deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
          WHEN no_messages THEN
            new_addresses := FALSE;
            DBMS_OUTPUT.PUT_LINE('No more messages');
         END;
      END LOOP; 
    END;
    /
    
    
  3. Run the procedure you created in Step 1 and specify the consumer of the messages you want to dequeue, as in the following example:
    SET SERVEROUTPUT ON SIZE 100000
    EXEC oe.get_cust_address('LOCAL_AGENT');
    

Propagating Messages Between a SYS.AnyData Queue and a Typed Queue

SYS.AnyData queues can interoperate with typed queues in a Streams environment. A typed queue is a queue that can stage messages of a particular type only. To propagate a message from a SYS.AnyData queue to a typed queue, the message must be transformed to match the type of the typed queue. The following sections provide examples of propagating non-LCR user messages and LCRs between a SYS.AnyData queue and a typed queue.


Note:

The examples in this section assume that you have completed the examples in "Wrapping User Message Payloads in a SYS.AnyData Wrapper".


See Also:

"Message Propagation and SYS.AnyData Queues" for more information about propagation between SYS.AnyData and typed queues

Example of Propagating Non-LCR User Messages to a Typed Queue

The following steps set up propagation from a SYS.AnyData queue named oe_q_any to a typed queue of type oe.cust_address_typ named oe_q_address. The source queue oe_q_any is at the dbs1.net database, and the destination queue oe_q_address is at the dbs2.net database. Both queues are owned by strmadmin.

  1. Connect as an administrative user who can grant privileges at dbs1.net.
  2. Grant the following privilege to strmadmin, if it was not already granted.
    GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
    
    
  3. Grant strmadmin EXECUTE privilege on oe.cust_address_typ at dbs1.net and dbs2.net.
    CONNECT oe/oe@dbs1.net
    
    GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
    
    CONNECT oe/oe@dbs2.net
    
    GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
    
    
  4. Create a typed queue at dbs2.net, if one does not already exist.
    CONNECT strmadmin/strmadminpw@dbs2.net
    
    BEGIN 
      DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table         => 'strmadmin.oe_q_table_address', 
        queue_payload_type  => 'oe.cust_address_typ', 
        multiple_consumers  => true);
      DBMS_AQADM.CREATE_QUEUE(
        queue_name   => 'strmadmin.oe_q_address', 
        queue_table  => 'strmadmin.oe_q_table_address');
      DBMS_AQADM.START_QUEUE(
        queue_name   => 'strmadmin.oe_q_address');
    END;
    /
    
    
  5. Create a database link between dbs1.net and dbs2.net if one does not already exist.
    CONNECT strmadmin/strmadminpw@dbs1.net
    
    CREATE DATABASE LINK dbs2.net CONNECT TO strmadmin IDENTIFIED BY strmadminpw
      USING 'DBS2.NET'; 
    
    
  6. Create a function called any_to_cust_address_typ in the strmadmin schema at dbs1.net that takes a SYS.AnyData payload containing a oe.cust_address_typ object and returns the oe.cust_address_typ object.
    CREATE OR REPLACE FUNCTION strmadmin.any_to_cust_address_typ(
      in_any IN SYS.AnyData) 
    RETURN OE.CUST_ADDRESS_TYP
    AS
      address       OE.CUST_ADDRESS_TYP;
      num_var       NUMBER;
      type_name     VARCHAR2(100);
    BEGIN
      -- Get the type of object
      type_name := in_any.GetTypeName();
      -- Check if the object type is OE.CUST_ADDRESS_TYP
      IF (type_name = 'OE.CUST_ADDRESS_TYP') THEN
        -- Put the address in the message into the address variable
        num_var := in_any.GetObject(address);
        RETURN address;
      ELSE
        raise_application_error(-20101, 'Conversion failed - ' || type_name);   
      END IF;
    END;
    /
    
    
  7. Create a transformation at dbs1.net using the DBMS_TRANSFORM package.
    BEGIN
      DBMS_TRANSFORM.CREATE_TRANSFORMATION( 
       schema         => 'strmadmin', 
       name           => 'anytoaddress', 
       from_schema    => 'SYS', 
       from_type      => 'ANYDATA', 
       to_schema      => 'oe', 
       to_type        => 'cust_address_typ', 
       transformation => 'strmadmin.any_to_cust_address_typ(source.user_data)'); 
    END;
    /
    
    
  8. Create a subscriber for the typed queue if one does not already exist. The subscriber must contain a rule that ensures that only messages of the appropriate type are propagated to the destination queue.
    DECLARE 
      subscriber  SYS.AQ$_AGENT; 
    BEGIN 
      subscriber := SYS.AQ$_AGENT ('ADDRESS_AGENT_REMOTE', 
                                   'STRMADMIN.OE_Q_ADDRESS@DBS2.NET', 
                                   0); 
      DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name     => 'strmadmin.oe_q_any', 
        subscriber     => subscriber,
        rule           => 
                    'TAB.USER_DATA.GetTypeName()=''OE.CUST_ADDRESS_TYP''',
        transformation => 'strmadmin.anytoaddress'); 
    END; 
    /
    
    
  9. Schedule propagation between the SYS.AnyData queue at dbs1.net and the typed queue at dbs2.net.
    BEGIN 
      DBMS_AQADM.SCHEDULE_PROPAGATION(
        queue_name   => 'strmadmin.oe_q_any', 
        destination  => 'dbs2.net'); 
    END;
    /
    
    
  10. Enqueue a message of oe.cust_address_typ type wrapped in a SYS.AnyData wrapper:
    CONNECT oe/oe@dbs1.net
    
    BEGIN
      oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ(
        '1668 Chong Tao','111181','Beijing',NULL, 'CN')));
    END;
    /
    COMMIT;
    
    
  11. After allowing some time for propagation, query the queue table at dbs2.net to view the propagated message:
    CONNECT strmadmin/strmadminpw@dbs2.net
    
    SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ADDRESS;
    
    
    See Also:

    Oracle9i Application Developer's Guide - Advanced Queuing for more information about transformations during propagation

Example of Propagating LCRs to a Typed Queue

To propagate LCRs from a SYS.AnyData queue to a typed queue, you complete the same steps as you do for non-LCR events, but Oracle supplies the transformation functions. You can use the following functions in the DBMS_STREAMS package to transform LCRs in SYS.AnyData queues to messages in typed queues:

You can propagate user-enqueued LCRs to an appropriate typed queue, but propagation of captured LCRs to a typed queue is not supported.

The following example sets up propagation of row LCRs from a SYS.AnyData queue named oe_q_any to a typed queue of type SYS.LCR$_ROW_RECORD named oe_q_lcr. The source queue oe_q_any is at the dbs1.net database, and the destination queue oe_q_lcr is at the dbs3.net database.

  1. Connect as an administrative user who can grant privileges at dbs1.net.
  2. Grant the following privilege to strmadmin, if it was not already granted.
    GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
    
    
  3. Create a queue of the LCR type if one does not already exist.
    CONNECT strmadmin/strmadminpw@dbs3.net
    
    BEGIN 
      DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table         => 'strmadmin.oe_q_table_lcr', 
        queue_payload_type  => 'SYS.LCR$_ROW_RECORD', 
        multiple_consumers  => true);
      DBMS_AQADM.CREATE_QUEUE(
        queue_name   => 'strmadmin.oe_q_lcr', 
        queue_table  => 'strmadmin.oe_q_table_lcr');
      DBMS_AQADM.START_QUEUE(
        queue_name   => 'strmadmin.oe_q_lcr');
    END;
    /
    
    
  4. Create a database link between dbs1.net and dbs3.net if one does not already exist.
    CONNECT strmadmin/strmadminpw@dbs1.net
    
    CREATE DATABASE LINK dbs3.net CONNECT TO strmadmin IDENTIFIED BY strmadminpw
      USING 'DBS3.NET'; 
    
    
  5. Create a transformation at dbs1.net using the DBMS_TRANSFORM package.
    BEGIN
      DBMS_TRANSFORM.CREATE_TRANSFORMATION( 
        schema         => 'strmadmin', 
        name           => 'anytolcr', 
        from_schema    => 'SYS', 
        from_type      => 'ANYDATA', 
        to_schema      => 'SYS', 
        to_type        => 'LCR$_ROW_RECORD', 
        transformation =>  
              'SYS.DBMS_STREAMS.CONVERT_ANYDATA_TO_LCR_ROW(source.user_data)'); 
    END;
    /
    
    
  6. Create a subscriber at the typed queue if one does not already exist. The subscriber specifies the CONVERT_ANYDATA_TO_LCR_ROW function for the transformation parameter.
    DECLARE 
      subscriber  SYS.AQ$_AGENT; 
    BEGIN 
      subscriber := SYS.AQ$_AGENT (
        'ROW_LCR_AGENT_REMOTE', 
        'STRMADMIN.OE_Q_LCR@DBS3.NET', 
        0); 
      DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name     => 'strmadmin.oe_q_any',
        subscriber     => subscriber,
        rule           => 'TAB.USER_DATA.GetTypeName()=''SYS.LCR$_ROW_RECORD''',
        transformation => 'strmadmin.anytolcr'); 
    END; 
    /
    
    
  7. Schedule propagation between the SYS.AnyData queue at dbs1.net and the LCR queue at dbs3.net.
    BEGIN 
      DBMS_AQADM.SCHEDULE_PROPAGATION(
        queue_name   => 'strmadmin.oe_q_any', 
        destination  => 'dbs3.net'); 
    END;
    /
    
    
  8. Create a procedure to construct and enqueue a row LCR into the strmadmin.oe_q_any queue:
    CONNECT oe/oe@dbs1.net
    
    CREATE OR REPLACE PROCEDURE oe.enq_row_lcr_proc(
                     source_dbname  VARCHAR2,
                     cmd_type       VARCHAR2,
                     obj_owner      VARCHAR2,
                     obj_name       VARCHAR2,
                     old_vals       SYS.LCR$_ROW_LIST,
                     new_vals       SYS.LCR$_ROW_LIST) AS
      eopt           DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop          DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid      RAW(16);
      row_lcr        SYS.LCR$_ROW_RECORD;
    BEGIN
      mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); 
      -- Construct the LCR based on information passed to procedure
      row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
        source_database_name  =>  source_dbname,
        command_type          =>  cmd_type,
        object_owner          =>  obj_owner,
        object_name           =>  obj_name,
        old_values            =>  old_vals,
        new_values            =>  new_vals);
      -- Enqueue the created row LCR
      DBMS_AQ.ENQUEUE(
        queue_name         =>  'strmadmin.oe_q_any', 
        enqueue_options    =>  eopt,
        message_properties =>  mprop,
        payload            =>  SYS.AnyData.ConvertObject(row_lcr),
        msgid              =>  enq_msgid);
    END enq_row_lcr_proc;
    /
    
    
  9. Create a row LCR that inserts a row into the oe.inventories table and enqueue the row LCR into the strmadmin.oe_q_any queue.
    DECLARE
      newunit1  SYS.LCR$_ROW_UNIT;
      newunit2  SYS.LCR$_ROW_UNIT;
      newunit3  SYS.LCR$_ROW_UNIT;
      newvals   SYS.LCR$_ROW_LIST;
    BEGIN
      newunit1 := SYS.LCR$_ROW_UNIT(
        'PRODUCT_ID', 
        SYS.AnyData.ConvertNumber(3503),
        DBMS_LCR.NOT_A_LOB,
        NULL,
        NULL);
      newunit2 := SYS.LCR$_ROW_UNIT(
        'WAREHOUSE_ID', 
        SYS.AnyData.ConvertNumber(1),
        DBMS_LCR.NOT_A_LOB,
        NULL,
        NULL);
      newunit3 := SYS.LCR$_ROW_UNIT(
        'QUANTITY_ON_HAND', 
        SYS.AnyData.ConvertNumber(157),
        DBMS_LCR.NOT_A_LOB,
        NULL,
        NULL);
      newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3);
    oe.enq_row_lcr_proc(
      source_dbname  =>  'DBS1.NET',
      cmd_type       =>  'INSERT',
      obj_owner      =>  'OE',
      obj_name       =>  'INVENTORIES',
      old_vals       =>  NULL,
      new_vals       =>  newvals);
    END;
    /
    COMMIT;
    
    
  10. After allowing some time for propagation, query the queue table at dbs3.net to view the propagated message:
    CONNECT strmadmin/strmadminpw@dbs3.net
    
    SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_LCR;
    
    
    See Also:

    The DBMS_STREAMS package in the Oracle9i Supplied PL/SQL Packages and Types Reference for more information about the row LCR and DDL LCR conversion functions