Skip Headers

Oracle9i Streams
Release 2 (9.2)

Part Number A96571-02
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page Go to next page
View PDF

20
Single Database Capture and Apply Example

This chapter illustrates an example of a single database that captures changes to a table, uses a DML handler during apply to re-enqueue the captured changes into a queue, and then applies a subset of the changes to a different table.

This chapter contains these topics:

Overview of the Single Database Capture and Apply Example

The example in this chapter illustrates using Streams to capture and apply data manipulation language (DML) changes at a single database named cpap.net. Specifically, this example captures DML changes to the employees table in the hr schema, placing row logical change records (LCRs) into a queue named streams_queue. Then, an apply process dequeues these row LCRs from the same queue and sends them to a DML handler. The DML handler performs the following actions on the captured row LCRs:

Figure 20-1 provides an overview of the environment.

Figure 20-1 Single Database Capture and Apply Example

Text description of strms036.gif follows
Text description of the illustration strms036.gif


See Also:

Prerequisites

The following prerequisites must be completed before you begin the example in this chapter.

Set Up the Environment

Complete the following steps to create the hr.emp_del table, set up Streams administrator, and create the queue.

  1. Show Output and Spool Results
  2. Create the hr.emp_del Table
  3. Set Up Users at cpap.net
  4. Create the Streams Queue at cpap.net
  5. Check the Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to all of the databases in the environment.


/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_setup_catapp.out

/*
Step 2 Create the hr.emp_del Table

Connect to cpap.net as the hr user.

*/
 
CONNECT hr/hr@cpap.net

/*

Create the hr.emp_del table. The shape of the emp_del table is the same as the employees table, except for one added timestamp column that will record the date when a row is inserted into the emp_del table.


*/

CREATE TABLE emp_del( 
  employee_id    NUMBER(6), 
  first_name     VARCHAR2(20), 
  last_name      VARCHAR2(25), 
  email          VARCHAR2(25), 
  phone_number   VARCHAR2(20), 
  hire_date      DATE, 
  job_id         VARCHAR2(10), 
  salary         NUMBER(8,2), 
  commission_pct NUMBER(2,2), 
  manager_id     NUMBER(6), 
  department_id  NUMBER(4),
  timestamp      DATE);

CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id);

ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id));

/*
Step 3 Set Up Users at cpap.net

Connect to cpap.net as SYS user.

*/
 
CONNECT SYS/CHANGE_ON_INSTALL@cpap.net AS SYSDBA

/*

Create the Streams administrator named strmadmin and grant this user the necessary privileges. These privileges enable the user to manage queues, execute subprograms in packages related to Streams, create rule sets, create rules, and monitor the Streams environment by querying data dictionary views and queue tables. You may choose a different name for this user.

In this example, the Streams administrator will be the apply user for the apply process and must be able to apply changes to the hr.emp_del table. Therefore, the Streams administrator is granted ALL privileges on this table.


Note:
  • To ensure security, use a password other than strmadminpw for the Streams administrator.
  • The SELECT_CATALOG_ROLE is not required for the Streams administrator. It is granted in this example so that the Streams administrator can monitor the environment easily.
  • If you plan to use the Streams tool in Oracle Enterprise Manager, then grant the Streams administrator SELECT ANY DICTIONARY privilege, in addition to the privileges shown in this step.
  • The ACCEPT command must appear on a single line in the script.

See Also:

"Configuring a Streams Administrator"

*/

GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE 
  TO strmadmin IDENTIFIED BY strmadminpw;

ACCEPT streams_tbs PROMPT 'Enter Streams administrator tablespace on cpap.net: '

ALTER USER strmadmin DEFAULT TABLESPACE &streams_tbs
                     QUOTA UNLIMITED ON &streams_tbs;

GRANT ALL ON hr.emp_del TO strmadmin;

GRANT EXECUTE ON DBMS_APPLY_ADM        TO strmadmin;
GRANT EXECUTE ON DBMS_AQ               TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM            TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM      TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK        TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM      TO strmadmin;

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

/*

Step 4 Create the Streams Queue at cpap.net

Connect to cpap.net as the strmadmin user.

*/

CONNECT strmadmin/strmadminpw@cpap.net

/*

Run the SET_UP_QUEUE procedure to create a queue named streams_queue at cpap.net. This queue will function as the Streams queue by holding the captured changes that will be dequeued by an apply process.

Running the SET_UP_QUEUE procedure performs the following actions:

*/

EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();

/*
Step 5 Check the Spool Results

Check the streams_setup_catapp.out spool file to ensure that all actions finished successfully after this script is completed.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Configure Capture and Apply

Complete the following steps to capture changes to the hr.employees table and apply these changes on single database in a customized way using a DML handler.

  1. Show Output and Spool Results
  2. Create an Alternate Tablespace for the LogMiner Tables at cpap.net
  3. Specify Supplemental Logging at cpap.net
  4. Configure the Capture Process at cpap.net
  5. Set the Instantiation SCN for the hr.employees Table
  6. Create an Agent Named emp_agent
  7. Create a Queue Subscriber
  8. Create a Procedure to Enqueue Row LCRs
  9. Create the DML Handler Procedure
  10. Set the DML Handler for the hr.employees Table
  11. Create a Procedure to Dequeue the Re-enqueued Events
  12. Configure the Apply Process at cpap.net
  13. Start the Apply Process at cpap.net
  14. Start the Capture Process at cpap.net
  15. Check the Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to all of the databases in the environment.


/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_config_capapp.out

/*
Step 2 Create an Alternate Tablespace for the LogMiner Tables at cpap.net

By default, the LogMiner tables are in the SYSTEM tablespace, but the SYSTEM tablespace may not have enough space for these tables once a capture process starts to capture changes. Therefore, you must create an alternate tablespace for the LogMiner tables.

See Also:

"Alternate Tablespace for LogMiner Tables"

Connect to cpap.net as SYS user.

*/
 
CONNECT SYS/CHANGE_ON_INSTALL@cpap.net AS SYSDBA

/*

Create an alternate tablespace for the LogMiner tables.


Note:

Each ACCEPT command must appear on a single line in the script.


*/

ACCEPT tspace_name DEFAULT 'logmnrts' PROMPT 'Enter tablespace name (for 
example, logmnrts): '

ACCEPT db_file_directory DEFAULT '' PROMPT 'Enter the complete path to the 
datafile directory (for example, /usr/oracle/dbs): '

ACCEPT db_file_name DEFAULT 'logmnrts.dbf' PROMPT 'Enter the name of the 
datafile (for example, logmnrts.dbf): '

CREATE TABLESPACE &tspace_name DATAFILE '&db_file_directory/&db_file_name' 
  SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
   
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('&tspace_name');

/*
Step 3 Specify Supplemental Logging at cpap.net

Supplemental logging places additional information in the redo log for changes made to tables. The apply process needs this extra information to perform certain operations, such as unique row identification.

The following statement specifies an unconditional supplemental log group for the primary key column in the hr.employees table.

See Also:
*/

ALTER TABLE hr.employees ADD SUPPLEMENTAL LOG GROUP log_group_employees_pk
  (employee_id) ALWAYS;

/*
Step 4 Configure the Capture Process at cpap.net

Connect to cpap.net as the strmadmin user.

*/
 
CONNECT strmadmin/strmadminpw@cpap.net

/*

Configure the capture process to capture DML changes to the hr.employees table at cpap.net. This step specifies that DML changes to this table are captured by the capture process and enqueued into the specified queue.

*/

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'hr.employees',   
    streams_type   => 'capture',
    streams_name   => 'capture_emp',
    queue_name     => 'strmadmin.streams_queue',
    include_dml    =>  true,
    include_ddl    =>  false);
END;
/

/*
Step 5 Set the Instantiation SCN for the hr.employees Table

Because this example captures and applies changes in a single database, no instantiation is necessary. However, the apply process at the cpap.net database still must be instructed to apply changes that were made to the hr.employees table after a certain system change number (SCN).

This example uses the GET_SYSTEM_CHANGE_NUMBER function in the DBMS_FLASHBACK package to obtain the current SCN for the database. This SCN is used to run the SET_TABLE_INSTANTIATION_SCN procedure in the DBMS_APPLY_ADM package.

The SET_TABLE_INSTANTIATION_SCN procedure controls which LCRs for a table are ignored by an apply process and which LCRs for a table are applied by an apply process. If the commit SCN of an LCR for a table from a source database is less than or equal to the instantiation SCN for that table at a destination database, then the apply process at the destination database discards the LCR. Otherwise, the apply process applies the LCR. In this example, the cpap.net database is both the source database and the destination database.

The apply process will apply transactions to the hr.employees table with SCNs that were committed after SCN obtained in this step.


Note:

The hr.employees table must also be prepared for instantiation. This preparation was done automatically when the the capture process was configured with a rule to capture DML changes to the hr.employees table in Step 4.


*/

DECLARE
  iscn  NUMBER;         -- Variable to hold instantiation SCN value
BEGIN
  iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
    source_object_name    => 'hr.employees',
    source_database_name  => 'cpap.net',
    instantiation_scn     => iscn);
END;
/

/*

Step 6 Create an Agent Named emp_agent

This example uses an agent named emp_agent for explicit enqueue into and dequeue from the streams_queue. Because the strmadmin user owns the queue table for this queue, the strmadmin user is a secure user of the queue. This step creates the agent named emp_agent and associates this agent with the strmadmin user, which allows the agent to be used for enqueues into and dequeues from the secure queue.

*/

BEGIN
  DBMS_AQADM.CREATE_AQ_AGENT(
     agent_name => 'emp_agent');
  DBMS_AQADM.ENABLE_DB_ACCESS(
    agent_name  => 'emp_agent',
    db_username => 'strmadmin');
END;
/

/*
Step 7 Create a Queue Subscriber

Create a subscriber that can be used by an application to dequeue the re-enqueued events. At least one subscriber must be specified before the events can be re-enqueued into the queue.

*/

DECLARE
  subscriber SYS.AQ$_AGENT;
BEGIN
  subscriber :=  SYS.AQ$_AGENT('emp_agent', NULL, NULL);  
  SYS.DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name          =>  'strmadmin.streams_queue',
    subscriber                  =>  subscriber,
    rule                =>  NULL,
    transformation      =>  NULL);
END;
/

/*
Step 8 Create a Procedure to Enqueue Row LCRs

This step creates the enq_row_lcr procedure. This procedure will be used in the DML handler procedure created in Step 9 to enqueue row LCRs that contain changes to the hr.employees table.

*/

CREATE OR REPLACE PROCEDURE enq_row_lcr(in_any IN SYS.ANYDATA) IS
  enqopt       DBMS_AQ.ENQUEUE_OPTIONS_T;
  mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
  recipients   DBMS_AQ.AQ$_RECIPIENT_LIST_T;
  enq_eventid  RAW(16);
BEGIN
  mprop.SENDER_ID := SYS.AQ$_AGENT(
    name     => 'emp_agent',
    address  => NULL,
    protocol => NULL);
  recipients(1) := SYS.AQ$_AGENT(
    name     => 'emp_agent',
    address  => NULL,
    protocol => NULL);
  mprop.RECIPIENT_LIST := recipients;
  DBMS_AQ.ENQUEUE(
    queue_name         => 'strmadmin.streams_queue',
    enqueue_options    => enqopt,
    message_properties => mprop,
    payload            => in_any,
    msgid              => enq_eventid);
END;
/

/*
Step 9 Create the DML Handler Procedure

This step creates the emp_dml_handler procedure. This procedure will be the DML handler for DML changes to the hr.employees table. It performs the following actions:

*/

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN SYS.ANYDATA) IS
  lcr          SYS.LCR$_ROW_RECORD;
  rc           PLS_INTEGER;
  command      VARCHAR2(10);
  old_values   SYS.LCR$_ROW_LIST;
BEGIN
  -- Re-enqueue the row LCR for explicit dequeue by another application
  enq_row_lcr(in_any);
  -- Access the LCR
  rc := in_any.GETOBJECT(lcr);
  -- Get the object command type
  command := lcr.GET_COMMAND_TYPE();
  -- Check for DELETE command on the hr.employees table
  IF command = 'DELETE' THEN
    -- Set the command_type in the row LCR to INSERT
    lcr.SET_COMMAND_TYPE('INSERT');
    -- Set the object_name in the row LCR to EMP_DEL
    lcr.SET_OBJECT_NAME('EMP_DEL');
    -- Get the old values in the row LCR
    old_values := lcr.GET_VALUES('old');
    -- Set the old values in the row LCR to the new values in the row LCR
    lcr.SET_VALUES('new', old_values);
    -- Set the old values in the row LCR to NULL
    lcr.SET_VALUES('old', NULL);
    -- Add a SYSDATE value for the timestamp column
    lcr.ADD_COLUMN('new', 'TIMESTAMP', SYS.AnyData.ConvertDate(SYSDATE));
    --  Apply the row LCR as an INSERT into the EMP_DEL table
    lcr.EXECUTE(true);
  END IF;
END;
/

/*
Step 10 Set the DML Handler for the hr.employees Table

Set the DML handler for the hr.employees table to the procedure created in Step 9. Notice that the DML handler must be set separately for each possible operation on the table: INSERT, UPDATE, and DELETE.


*/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'INSERT',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'UPDATE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'DELETE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

/*
Step 11 Create a Procedure to Dequeue the Re-enqueued Events

The emp_dq procedure creates in this step can be used to dequeue the events that are re-enqueued by the DML handler created in Step 9. When the emp_dq procedure is executed, it dequeues each row LCR in the queue and displays the type of command in the row LCR, either INSERT, UPDATE, or DELETE. Any information in the row LCRs can be accessed and displayed, not just the command type.

See Also:

"Displaying Detailed Information About Apply Errors" for more information about displaying information in LCRs

*/

CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS
  deqopt       DBMS_AQ.DEQUEUE_OPTIONS_T;
  mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
  msgid        RAW(16);
  payload      SYS.AnyData;
  new_messages BOOLEAN := TRUE;
  row_lcr      SYS.LCR$_ROW_RECORD;
  tc           pls_integer;
  next_trans   EXCEPTION;
  no_messages  EXCEPTION; 
  pragma exception_init (next_trans, -25235);
  pragma exception_init (no_messages, -25228);
BEGIN
  deqopt.consumer_name := consumer;
  deqopt.wait := 1;
  WHILE (new_messages) LOOP
    BEGIN
      DBMS_AQ.DEQUEUE(
        queue_name          =>  'strmadmin.streams_queue',
        dequeue_options     =>  deqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  msgid);
      COMMIT;
      deqopt.navigation := DBMS_AQ.NEXT;
      IF (payload.GetTypeName = 'SYS.LCR$_ROW_RECORD') THEN
        tc := payload.GetObject(row_lcr);   
        DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');
      END IF;                       
      EXCEPTION
        WHEN next_trans THEN
        deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
        WHEN no_messages THEN
          new_messages  := FALSE;
          DBMS_OUTPUT.PUT_LINE('No more events');
     END;
  END LOOP; 
END;
/

/*
Step 12 Configure the Apply Process at cpap.net

Configure an apply process to apply DML changes to the hr.employees table. Although the DML handler for the apply process causes deleted employees to be inserted into the emp_del table, this rule specifies the employees table, because the row LCRs in the queue contain changes to the employees table, not the emp_del table.

*/

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'hr.employees',
    streams_type    => 'apply', 
    streams_name    => 'apply_emp',
    queue_name      => 'strmadmin.streams_queue',
    include_dml     =>  true,
    include_ddl     =>  false,
    source_database => 'cpap.net');
END;
/

/*
Step 13 Start the Apply Process at cpap.net

Set the disable_on_error parameter to n so that the apply process will not be disabled if it encounters an error, and start the apply process at cpap.net.

*/

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name  => 'apply_emp', 
    parameter   => 'disable_on_error', 
    value       => 'n');
END;
/
 
BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    apply_name  => 'apply_emp');
END;
/

/*
Step 14 Start the Capture Process at cpap.net

Start the capture process at cpap.net.

*/

BEGIN
  DBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name  => 'capture_emp');
END;
/

/*
Step 15 Check the Spool Results

Check the streams_config_catapp.out spool file to ensure that all actions finished successfully after this script is completed.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

Make DML Changes, Query for Results, and Dequeue Events

Complete the following steps to make DML changes to the hr.employees table, query for the resulting inserts into the hr.emp_del table and the re-enqueued events in the streams_queue_table, and dequeue the events that were re-enqueued by the DML handler.

Step 1 Perform an INSERT, UPDATE, and DELETE on hr.employees

Make the following DML changes to the hr.employees table.

CONNECT hr/hr@cpap.net

INSERT INTO hr.employees values(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', 
  NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;

UPDATE hr.employees SET salary=5999 WHERE employee_id=206;
COMMIT;

DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;
Step 2 Query the hr.emp_del Table and the streams_queue_table

After some time passes to allow for capture and apply of the changes performed in the previous step, run the following queries to see the results:

CONNECT strmadmin/strmadminpw@cpap.net

SELECT * FROM hr.emp_del;

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;

When you run the first query, you should see a record for the employee with an employee_id of 207. This employee was deleted in the previous step. When you run the second query, you should see the re-enqueued events resulting from all of the changes in the previous step, and the MSG_STATE should be READY for these events.

Step 3 Dequeue Events Re-enqueued by the DML Handler

Use the emp_dq procedure to dequeue the events that were re-enqueued by the DML handler.

SET SERVEROUTPUT ON SIZE 100000

EXEC emp_dq('emp_agent');

For each row changed by a DML statement, one line is returned, and each line states the command type of the change (either INSERT, UPDATE, or DELETE). If you repeat the query on the queue table in Step 2 after the events are dequeued, then the dequeued events should have been consumed. That is, the MSG_STATE should be PROCESSED for these events.

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;