Skip Headers

Oracle9i Streams
Release 2 (9.2)

Part Number A96571-02
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page Go to next page
View PDF

16
Other Streams Management Tasks

This chapter provides instructions for managing logical change records (LCRs) and Streams tags, as well as instructions for performing a full database export/import in a Streams environment.

This chapter contains these topics:

Each task described in this chapter should be completed by a Streams administrator that has been granted the appropriate privileges, unless specified otherwise.

See Also:

"Configuring a Streams Administrator"

Managing Logical Change Records (LCRs)

This section describes managing logical change records (LCRs). Make sure you meet the following requirements when you create or modify an LCR:

Constructing and Enqueuing LCRs

Use the following LCR constructors to create LCRs:

The following example creates a queue in an Oracle database and an apply process associated with the queue. Then, it creates a PL/SQL procedure that constructs a row LCR based on information passed to it and enqueues the row LCR into the queue:

  1. Create a Streams queue in an Oracle database. This example assumes that the Streams administrator is strmadmin user.
    CONNECT strmadmin/strmadminpw
    
    BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE(
       queue_table          =>  'strm04_queue_table',
       storage_clause       =>  NULL,
       queue_name           =>  'strm04_queue');
    END;
    /
    
    
  2. Create an apply process at the Oracle database to receive messages in the queue. Make sure the apply_captured parameter is set to false when you create the apply process, because the apply process will be applying user-enqueued events, not events captured by a capture process. Also, make sure the apply_user parameter is set to hr, because changes will be applied in to the hr.regions table, and the apply user must have privileges to make DML changes to this table.
    BEGIN
      DBMS_APPLY_ADM.CREATE_APPLY(
         queue_name      => 'strm04_queue',
         apply_name      => 'strm04_apply',
         apply_captured  => false,
         apply_user      => 'hr');
    END;
    /
    
    
  3. Create a rule set for the apply process and add a rule that applies DML changes to the hr.regions table made at the dbs1.net source database.
    BEGIN 
      DBMS_STREAMS_ADM.ADD_TABLE_RULES(
        table_name               =>  'hr.regions',
        streams_type             =>  'apply',
        streams_name             =>  'strm04_apply',
        queue_name               =>  'strm04_queue',
        include_dml              =>  true,
        include_ddl              =>  false,
        include_tagged_lcr       =>  false,
        source_database          =>  'dbs1.net');
    END;
    /
    
    
  4. Set the disable_on_error parameter for the apply process to n.
    BEGIN
      DBMS_APPLY_ADM.SET_PARAMETER(
        apply_name  => 'strm04_apply', 
        parameter   => 'disable_on_error', 
        value       => 'n');
    END;
    /
    
    
  5. Start the apply process.
    EXEC DBMS_APPLY_ADM.START_APPLY('strm04_apply');
    
    
    
  6. Create a procedure called construct_row_lcr that constructs a row LCR and then enqueues it into the queue created in Step 1.
    CREATE OR REPLACE PROCEDURE construct_row_lcr(
                     source_dbname  VARCHAR2,
                     cmd_type       VARCHAR2,
                     obj_owner      VARCHAR2,
                     obj_name       VARCHAR2,
                     old_vals       SYS.LCR$_ROW_LIST,
                     new_vals       SYS.LCR$_ROW_LIST) AS
      eopt           DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop          DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid      RAW(16);
      row_lcr        SYS.LCR$_ROW_RECORD;
    BEGIN
    
          mprop.SENDER_ID := SYS.AQ$_AGENT('strmadmin', NULL, NULL); 
    
      -- Construct the LCR based on information passed to procedure
      row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
        source_database_name  =>  source_dbname,
        command_type          =>  cmd_type,
        object_owner          =>  obj_owner,
        object_name           =>  obj_name,
        old_values            =>  old_vals,
        new_values            =>  new_vals);
      -- Enqueue the created row LCR
      DBMS_AQ.ENQUEUE(
        queue_name         =>  'strm04_queue', 
        enqueue_options    =>  eopt,
        message_properties =>  mprop,
        payload            =>  SYS.AnyData.ConvertObject(row_lcr),
        msgid              =>  enq_msgid);
    END construct_row_lcr;
    /
    

    Note:

    The application does not need to specify a transaction identifier or SCN when it creates an LCR because the apply process generates these values and stores them in memory. If a transaction identifier or SCN is specified in the LCR, then the apply process ignores it and assigns a new value.


    See Also:

    Oracle9i Supplied PL/SQL Packages and Types Reference for more information about LCR constructors

  7. Create and enqueue LCRs using the construct_row_lcr procedure created in Step 2.
    1. Create a row LCR that inserts a row into the hr.regions table.
      CONNECT strmadmin/strmadminpw
      
      DECLARE
        newunit1  SYS.LCR$_ROW_UNIT;
        newunit2  SYS.LCR$_ROW_UNIT;
        newvals   SYS.LCR$_ROW_LIST;
      BEGIN
        newunit1 := SYS.LCR$_ROW_UNIT(
          'region_id', 
          SYS.AnyData.ConvertNumber(5),
          DBMS_LCR.NOT_A_LOB,
          NULL,
          NULL);
        newunit2 := SYS.LCR$_ROW_UNIT(
          'region_name', 
          SYS.AnyData.ConvertVarchar2('Moon'),
          DBMS_LCR.NOT_A_LOB,
          NULL,
          NULL);
        newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2);
      construct_row_lcr(
        source_dbname  =>  'dbs1.net',
        cmd_type       =>  'INSERT',
        obj_owner      =>  'hr',
        obj_name       =>  'regions',
        old_vals       =>  NULL,
        new_vals       =>  newvals);
      END;
      /
      COMMIT;
      
      

      You can connect as the hr user and query the hr.regions table to view the applied row change:

      CONNECT hr/hr
      
      SELECT * FROM hr.regions;
      
      

      The row with a region_id of 5 should have Moon for the region_name.

    2. Create a row LCR that updates a row from the hr.regions table.
      CONNECT strmadmin/strmadminpw
      
      DECLARE
        oldunit1  SYS.LCR$_ROW_UNIT;
        oldunit2  SYS.LCR$_ROW_UNIT;
        oldvals   SYS.LCR$_ROW_LIST;
        newunit1  SYS.LCR$_ROW_UNIT;
        newvals   SYS.LCR$_ROW_LIST;
      BEGIN
        oldunit1 := SYS.LCR$_ROW_UNIT(
          'region_id', 
          SYS.AnyData.ConvertNumber(5),
          DBMS_LCR.NOT_A_LOB,
          NULL,
          NULL);
        oldunit2 := SYS.LCR$_ROW_UNIT(
          'region_name', 
          SYS.AnyData.ConvertVarchar2('Moon'),
          DBMS_LCR.NOT_A_LOB,
          NULL,
          NULL);
        oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2);
        newunit1 := SYS.LCR$_ROW_UNIT(
          'region_name', 
          SYS.AnyData.ConvertVarchar2('Mars'),
          DBMS_LCR.NOT_A_LOB,
          NULL,
          NULL);
        newvals := SYS.LCR$_ROW_LIST(newunit1);
      construct_row_lcr(
        source_dbname  =>  'dbs1.net',
        cmd_type       =>  'UPDATE',
        obj_owner      =>  'hr',
        obj_name       =>  'regions',
        old_vals       =>  oldvals,
        new_vals       =>  newvals);
      END;
      /
      COMMIT;
      
      

      You can connect as the hr user and query the hr.regions table to view the applied row change:

      CONNECT hr/hr
      
      SELECT * FROM hr.regions;
      
      

      The row with a region_id of 5 should have Mars for the region_name.

    3. Create a row LCR that deletes a row from the hr.regions table.
      CONNECT strmadmin/strmadminpw
      
      DECLARE
        oldunit1  SYS.LCR$_ROW_UNIT;
        oldunit2  SYS.LCR$_ROW_UNIT;
        oldvals   SYS.LCR$_ROW_LIST;
      BEGIN
        oldunit1 := SYS.LCR$_ROW_UNIT(
          'region_id', 
          SYS.AnyData.ConvertNumber(5),
          DBMS_LCR.NOT_A_LOB,
          NULL,
          NULL);
        oldunit2 := SYS.LCR$_ROW_UNIT(
          'region_name',
          SYS.AnyData.ConvertVarchar2('Mars'),
          DBMS_LCR.NOT_A_LOB,
          NULL,
          NULL);
        oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2);
      construct_row_lcr(
        source_dbname  =>  'dbs1.net',
        cmd_type       =>  'DELETE',
        obj_owner      =>  'hr',
        obj_name       =>  'regions',
        old_vals       =>  oldvals,
        new_vals       =>  NULL);
      END;
      /
      COMMIT;
      
      

      You can connect as the hr user and query the hr.regions table to view the applied row change:

      CONNECT hr/hr
      
      SELECT * FROM hr.regions;
      
      

      The row with a region_id of 5 should have been deleted.

The use_old Parameter in Some Row LCR Member Functions

Release 9.2.0.2 introduces a new parameter, use_old, in the following member functions for the SYS.LCR$_ROW_RECORD type:

Currently, the use_old parameter is not documented in the Oracle9i Supplied PL/SQL Packages and Types Reference. The following sections replace the sections for these member functions in the Oracle9i Supplied PL/SQL Packages and Types Reference.

GET_LOB_INFORMATION Member Function

Gets the LOB information for the column.

The return value can be one of the following:

DBMS_LCR.NOT_A_LOB        CONSTANT NUMBER := 1;
DBMS_LCR.NULL_LOB         CONSTANT NUMBER := 2;
DBMS_LCR.INLINE_LOB       CONSTANT NUMBER := 3;
DBMS_LCR.EMPTY_LOB        CONSTANT NUMBER := 4;
DBMS_LCR.LOB_CHUNK        CONSTANT NUMBER := 5;
DBMS_LCR.LAST_LOB_CHUNK   CONSTANT NUMBER := 6;

Returns NULL if the specified column does not exist.

If the command type of the row LCR is UPDATE, then specifying 'Y' for the use_old parameter is a convenient way to get the value of the columns.

Syntax
MEMBER FUNCTION GET_LOB_INFORMATION(
  value_type   IN VARCHAR2,
  column_name  IN VARCHAR2,
  use_old      IN VARCHAR2   DEFAULT 'Y') 
RETURN NUMBER;
Parameters
Table 16-1 GET_LOB_INFORMATION Function Parameters
Parameter Description

value_type

The type of value to return for the column, either old or new

column_name

The name of the column

use_old

If Y and value_type is new, and no new value exists, then returns the corresponding old value. If N and value_type is new, then does not return the old value if no new value exists.

If value_type is old or if the command_type of the row LCR is not UPDATE, then the value of the use_old parameter is ignored.

NULL is not a valid specification for the use_old parameter.

GET_VALUE Member Function

Returns the old or new value for the specified column, depending on the value type specified.

If the command type of the row LCR is UPDATE, then specifying 'Y' for the use_old parameter is a convenient way to get the value of a column.

Syntax
MEMBER FUNCTION GET_VALUE(
   value_type          IN VARCHAR2,
   column_name         IN VARCHAR2,
   use_old             IN VARCHAR2   DEFAULT 'Y') 
RETURN SYS.AnyData;
Parameters
Table 16-2 GET_VALUE Procedure Parameters
Parameter Description

value_type

The type of value to return for the column. Specify old to get the old value for the column. Specify new to get the new value for the column.

column_name

The column name. If the column is present and has a NULL value, returns a SYS.AnyData instance containing a NULL value. If the column value is absent, returns a NULL.

use_old

If Y and value_type is new, and no new value exists, then returns the corresponding old value.

If N and value_type is new, then returns NULL if no new value exists.

If value_type is old or if the command_type of the row LCR is not UPDATE, then the value of the use_old parameter is ignored.

NULL is not a valid specification for the use_old parameter.

GET_VALUES Member Function

Returns a list of old or new values, depending on the value type specified.

If the command type of the row LCR is UPDATE, then specifying 'Y' for the use_old parameter is a convenient way to get the values of all columns.

Syntax
MEMBER FUNCTION GET_VALUES(
   value_type   IN VARCHAR2,
   use_old      IN VARCHAR2   DEFAULT 'Y')
RETURN SYS.LCR$_ROW_LIST;
Parameters
Table 16-3 GET_VALUES Procedure Parameters
Parameter Description

value_type

The type of values to return. Specify old to return a list of old values. Specify new to return a list of new values.

use_old

If Y and value_type is new, then returns a list of all new values in the LCR. If a new value does not exist in the list, then returns the corresponding old value. Therefore, the returned list contains all existing new values and old values for the new values that do not exist.

If N and value_type is new, then returns a list of all new values in the LCR without returning any old values.

If value_type is old or if the command_type of the row LCR is not UPDATE, then the value of the use_old parameter is ignored.

NULL is not a valid specification for the use_old parameter.

Constructing and Processing LCRs Containing LOB Columns

The following are general considerations for row changes involving LOBs in a Streams environment:

The following sections contain information about the requirements you must meet when constructing or processing LOBs and about apply process behavior for LCRs containing LOBs. This section also includes an example that constructs and enqueues LCRs containing LOBs.

Requirements for Constructing and Processing LCRs Containing LOBs

If your environment uses LCRs that contain LOB columns, then you must meet the following requirements when you construct these LCRs or process them with an apply handler or a rule-based transformation:

All validation of these requirements is done by an apply process. If these requirements are not met, then an LCR containing a LOB column cannot be applied by an apply process nor processed by an apply handler. In this case, the LCR is moved to an exception queue with the rest of the LCRs in the same transaction.

See Also:

Apply Process Behavior for LCRs Containing LOBs

An apply process behaves in the following way when it encounters an LCR that contains a LOB:

Example Script for Constructing and Enqueuing LCRs Containing LOBs

The following example illustrates creating a PL/SQL procedure for constructing and enqueuing LCRs containing LOBs. This example assumes that you have prepared your database for Streams by completing the necessary actions in Chapter 11, "Configuring a Streams Environment". Make sure the Streams administrator who runs this script has EXECUTE privilege on the DBMS_AQ and DBMS_APPLY_ADM packages.

  1. Show Output and Spool Results
  2. Connect as the Streams Administrator
  3. Create a Streams Queue
  4. Create and Start an Apply Process
  5. Create a Schema with Tables Containing LOB Columns
  6. Grant the Streams Administrator Necessary Privileges on the Tables
  7. Create a PL/SQL Procedure to Enqueue LCRs Containing LOBs
  8. Create the do_enq_clob Function to Enqueue CLOBs
  9. Enqueue CLOBs Using the do_enq_clob Function
  10. Check the Spool Results

Note:

If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to all of the databases in the environment.


/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results

Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL lob_construct.out

/*
Step 2 Connect as the Streams Administrator
*/

SET ECHO ON
SET FEEDBACK 1
SET NUMWIDTH 10
SET LINESIZE 80
SET TRIMSPOOL ON
SET TAB OFF
SET PAGESIZE 100
SET SERVEROUTPUT ON SIZE 100000

CONNECT strmadmin/strmadminpw

/*
Step 3 Create a Streams Queue
*/

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE( 
    queue_table => 'lobex_queue_table', 
    queue_name  => 'lobex_queue');
END;
/

/*
Step 4 Create and Start an Apply Process
*/

BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    queue_name      => 'strmadmin.lobex_queue',
    apply_name      => 'apply_lob',
    apply_captured  => false);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name => 'apply_lob', 
    parameter  => 'disable_on_error',
    value      => 'n');
END;
/

BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    'apply_lob');
END;
/

/*
Step 5 Create a Schema with Tables Containing LOB Columns
*/

CONNECT sys/change_on_install AS SYSDBA

CREATE USER lob_user IDENTIFIED BY Lob_user_pw;
GRANT CONNECT,RESOURCE TO lob_user;

CONNECT lob_user/lob_user_pw

CREATE TABLE with_clob (a  NUMBER PRIMARY KEY,
                        c1 CLOB,
                        c2 CLOB,
                        c3 CLOB);

CREATE TABLE with_blob (a NUMBER PRIMARY KEY,
                        b BLOB);

/*
Step 6 Grant the Streams Administrator Necessary Privileges on the Tables

Granting these privileges enables the Streams administrator to get the LOB length for offset and to perform DML operations on the tables.

*/

GRANT ALL ON with_clob TO strmadmin;
GRANT ALL ON with_blob TO strmadmin;
COMMIT;

/*
Step 7 Create a PL/SQL Procedure to Enqueue LCRs Containing LOBs
*/

CONNECT strmadmin/strmadminpw

CREATE OR REPLACE PROCEDURE enq_row_lcr(source_dbname  VARCHAR2,
                                            cmd_type       VARCHAR2,
                                            obj_owner      VARCHAR2,
                                            obj_name       VARCHAR2,
                                            old_vals       SYS.LCR$_ROW_LIST,
                                            new_vals       SYS.LCR$_ROW_LIST) AS
  eopt           DBMS_AQ.ENQUEUE_OPTIONS_T;
  mprop          DBMS_AQ.MESSAGE_PROPERTIES_T;
  enq_msgid      RAW(16);
  xr_lcr         SYS.LCR$_ROW_RECORD;
BEGIN
  mprop.SENDER_ID := SYS.AQ$_AGENT('strmadmin', NULL, NULL);
  xr_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
              source_database_name => source_dbname,
              command_type         => cmd_type,
              object_owner         => obj_owner,
              object_name          => obj_name,
              old_values           => old_vals,
              new_values           => new_vals);
  -- Enqueue a row lcr
  DBMS_AQ.ENQUEUE(
        queue_name         => 'lobex_queue', 
        enqueue_options    => eopt,
        message_properties => mprop,
        payload            => SYS.AnyData.ConvertObject(xr_lcr),
        msgid              => enq_msgid);
END enq_row_lcr;
/
SHOW ERRORS

/*
Step 8 Create the do_enq_clob Function to Enqueue CLOBs
*/

-- Description of each variable:
-- src_dbname  : Source database name
-- tab_owner   : Table owner
-- tab_name    : Table name
-- col_name    : Name of the CLOB column
-- new_vals    : SYS.LCR$_ROW_LIST containing primary key and supplementally  
--               logged colums
-- clob_data   : CLOB that contains data to be sent
-- offset      : Offset from which data should be sent, default is 1
-- lsize       : Size of data to be sent, default is 0
-- chunk_size  : Size used for creating LOB chunks, default is 2048

CREATE OR REPLACE FUNCTION do_enq_clob(src_dbname     VARCHAR2,
                                       tab_owner      VARCHAR2,
                                       tab_name       VARCHAR2,
                                       col_name       VARCHAR2,
                                       new_vals       SYS.LCR$_ROW_LIST,
                                       clob_data      CLOB,
                                       offset         NUMBER default 1,
                                       lsize          NUMBER default 0,
                                       chunk_size     NUMBER default 2048) 
RETURN NUMBER IS
  lob_offset NUMBER; -- maintain lob offset
  newunit    SYS.LCR$_ROW_UNIT;
  tnewvals   SYS.LCR$_ROW_LIST;
  lob_flag   NUMBER;
  lob_data   VARCHAR2(32767);
  lob_size   NUMBER;
  unit_pos   NUMBER;
  final_size NUMBER;
  exit_flg   BOOLEAN;
  c_size     NUMBER;
  i          NUMBER;
BEGIN
  lob_size := DBMS_LOB.GETLENGTH(clob_data);
  unit_pos := new_vals.count + 1;
  tnewvals := new_vals;
  c_size   := chunk_size;
  i := 0;
  -- validate parameters
  IF (unit_pos <= 1) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid new_vals list');
    RETURN 1;
  END IF;

  IF (c_size < 1) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid LOB chunk size');
    RETURN 1;
  END IF;

  IF (lsize < 0 OR lsize > lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid LOB size');
    RETURN 1;
  END IF;

  IF (offset < 1 OR offset >= lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid lob offset');
    RETURN 1;
  ELSE
    lob_offset := offset;
  END IF;

  -- calculate final size
  IF (lsize = 0) THEN
    final_size := lob_size;
  ELSE
    final_size := lob_offset + lsize;
  END IF;

  --  The following output lines are for debugging purposes only.
  -- DBMS_OUTPUT.PUT_LINE('Final size: ' || final_size);
  -- DBMS_OUTPUT.PUT_LINE('Lob size: ' || lob_size);

  IF (final_size < 1 OR final_size > lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid lob size');
    RETURN 1;
  END IF;

  -- expand new_vals list for LOB column
  tnewvals.extend();

  exit_flg := FALSE;

  -- Enqueue all LOB chunks
  LOOP
    --  The following output line is for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('About to write chunk#' || i);
    i := i + 1;
 
    -- check if last LOB chunk
    IF ((lob_offset + c_size) < final_size) THEN
      lob_flag := DBMS_LCR.LOB_CHUNK;
    ELSE
      lob_flag := DBMS_LCR.LAST_LOB_CHUNK;
      exit_flg := TRUE;
      --  The following output line is for debugging purposes only.
      DBMS_OUTPUT.PUT_LINE('Last LOB chunk');
    END IF;

    --  The following output lines are for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('lob offset: ' || lob_offset);
    DBMS_OUTPUT.PUT_LINE('Chunk size: ' || to_char(c_size));

    lob_data := DBMS_LOB.SUBSTR(clob_data, c_size, lob_offset); 

    -- create row unit for clob
    newunit := SYS.LCR$_ROW_UNIT(col_name,
                                 SYS.AnyData.ConvertVarChar2(lob_data), 
                                 lob_flag, 
                                 lob_offset, 
                                 NULL);

    -- insert new LCR$_ROW_UNIT
    tnewvals(unit_pos) := newunit;  

    -- enqueue lcr
    enq_row_lcr(
          source_dbname => src_dbname,
          cmd_type      => 'LOB WRITE',
          obj_owner     => tab_owner,
          obj_name      => tab_name,
          old_vals      => NULL,
          new_vals      => tnewvals);

    -- calculate next chunk size 
    lob_offset := lob_offset + c_size;
    
    IF ((final_size - lob_offset) < c_size) THEN
      c_size := final_size - lob_offset + 1;
    END IF;

    --  The following output line is for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('Next chunk size : ' || TO_CHAR(c_size));

    IF (c_size < 1) THEN
      exit_flg := TRUE;
    END IF;

    EXIT WHEN exit_flg;

  END LOOP;

  RETURN 0;
END do_enq_clob;
/

SHOW ERRORS

/*
Step 9 Enqueue CLOBs Using the do_enq_clob Function

The DBMS_OUTPUT lines in the following example can be used for debugging purposes if necessary. If they are not needed, then they can be commented out or deleted.

*/

SET SERVEROUTPUT ON SIZE 100000
DECLARE
  c1_data CLOB;
  c2_data CLOB;
  c3_data CLOB;
  newunit1 SYS.LCR$_ROW_UNIT;
  newunit2 SYS.LCR$_ROW_UNIT;
  newunit3 SYS.LCR$_ROW_UNIT;
  newunit4 SYS.LCR$_ROW_UNIT;
  newvals  SYS.LCR$_ROW_LIST;
  big_data VARCHAR(22000);
  n        NUMBER;
BEGIN
  -- Create primary key for LCR$_ROW_UNIT
  newunit1 := SYS.LCR$_ROW_UNIT('A',
                                Sys.AnyData.ConvertNumber(3), 
                                NULL, 
                                NULL, 
                                NULL);
  -- Create empty CLOBs
  newunit2 := sys.lcr$_row_unit('C1',
                                Sys.AnyData.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newunit3 := SYS.LCR$_ROW_UNIT('C2',
                                Sys.AnyData.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newunit4 := SYS.LCR$_ROW_UNIT('C3',
                                Sys.AnyData.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4);

  -- Perform an insert
  enq_row_lcr(
    source_dbname => 'MYDB.NET',
    cmd_type      => 'INSERT',
    obj_owner     => 'LOB_USER',
    obj_name      => 'WITH_CLOB',
    old_vals      => NULL,
    new_vals      => newvals);

  -- construct clobs
  big_data := RPAD('Hello World', 1000, '_');
  big_data := big_data || '#';
  big_data := big_data || big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c1_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c1_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  big_data := RPAD('1234567890#', 1000, '_');
  big_data := big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c2_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c2_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  big_data := RPAD('ASDFGHJKLQW', 2000, '_');
  big_data := big_data || '#';
  big_data := big_data || big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c3_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c3_data, 
    amount  => length(big_data), 
    buffer  => big_data);

  -- pk info
  newunit1 := SYS.LCR$_ROW_UNIT('A',
                                SYS.AnyData.ConvertNumber(3), 
                                NULL, 
                                NULL, 
                                NULL);
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 

  -- write c1 clob
  n := do_enq_clob(
         src_dbname => 'MYDB.NET',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C1',
         new_vals   => newvals,
         clob_data  => c1_data,
         offset     => 1,
         chunk_size => 1024);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  -- write c2 clob
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  n := do_enq_clob(
         src_dbname => 'MYDB.NET',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C2',
         new_vals   => newvals,
         clob_data  => c2_data,
         offset     => 1,
         chunk_size => 2000);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  -- write c3 clob
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  n := do_enq_clob(src_dbname=>'MYDB.NET',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C3',
         new_vals   => newvals,
         clob_data  => c3_data,
         offset     => 1,
         chunk_size => 500);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  COMMIT;

END;
/

/*
Step 10 Check the Spool Results

Check the lob_construct.out spool file to ensure that all actions completed successfully after this script completes.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

After you run the script, you can check the lob_user.with_clob table to list the rows applied by the apply process. The DBMS_LOCK.SLEEP statement is used to give the apply process time to apply the enqueued rows.

CONNECT lob_user/lob_user_pw

EXECUTE DBMS_LOCK.SLEEP(10);

SELECT a, c1, c2, c3 FROM with_clob ORDER BY a;

SELECT a, LENGTH(c1), LENGTH(c2), LENGTH(c3) FROM with_clob ORDER BY a;

Managing Streams Tags

You can set or get the value of the tags generated by the current session or by an apply process. The following sections describe how to set and get tag values.

Managing Streams Tags for the Current Session

This section contains instructions for setting and getting the tag for the current session.

Setting the Tag Values Generated by the Current Session

You can set the tag for all redo entries generated by the current session using the SET_TAG procedure in the DBMS_STREAMS package. For example, to set the tag to the hexadecimal value of '1D' in the current session, run the following procedure:

BEGIN
   DBMS_STREAMS.SET_TAG(
      tag  =>  HEXTORAW('1D'));
END;

/

After running this procedure, each redo entry generated by DML or DDL statements in the current session will have a tag value of 1D. Running this procedure affects only the current session.

Getting the Tag Value for the Current Session

You can get the tag for all redo entries generated by the current session using the GET_TAG procedure in the DBMS_STREAMS package. For example, to get the hexadecimal value of the tags generated in the redo entries for the current session, run the following procedure:

SET SERVEROUTPUT ON
DECLARE
   raw_tag RAW(2048);
BEGIN
   raw_tag := DBMS_STREAMS.GET_TAG();
   DBMS_OUTPUT.PUT_LINE('Tag Value = ' || RAWTOHEX(raw_tag));
END;
/

You can also display the tag value for the current session by querying the DUAL view:

SELECT DBMS_STREAMS.GET_TAG FROM DUAL;

Managing Streams Tags for an Apply Process

This section contains instructions for setting and removing the tag for an apply process.

See Also:

Setting the Tag Values Generated by an Apply Process

An apply process generates redo entries when it applies changes to a database or invokes handlers. You can set the default tag for all redo entries generated by an apply process when you create the apply process using the CREATE_APPLY procedure in the DBMS_APPLY_ADM package, or when you alter an existing apply process using the ALTER_APPLY procedure in the DBMS_APPLY_ADM package. In both of these procedures, set the apply_tag parameter to the value you want to specify for the tags generated by the apply process.

For example, to set the value of the tags generated in the redo log by an existing apply process named strm01_apply to the hexadecimal value of '7', run the following procedure:

BEGIN
  DBMS_APPLY_ADM.ALTER_APPLY(
     apply_name  =>  'strm01_apply',
     apply_tag   =>  HEXTORAW('7'));
END;

/

After running this procedure, each redo entry generated by the apply process will have a tag value of 7.

Removing the Apply Tag for an Apply Process

You remove the apply tag for an apply process by setting the remove_apply_tag parameter to true in the ALTER_APPLY procedure in the DBMS_APPLY_ADM package. Removing the apply tag means that each redo entry generated by the apply process has a NULL tag. For example, the following procedure removes the apply tag from an apply process named strm02_apply.

BEGIN
  DBMS_APPLY_ADM.ALTER_APPLY(
    apply_name       => 'strm02_apply',
    remove_apply_tag => true);
END;
/

Performing Database Point-in-Time Recovery on a Destination Database

Point-in-time recovery is the recovery of a database to a specified noncurrent time, SCN, or log sequence number. If point-in-time recovery is required at a destination database in a Streams environment, then you must reapply the captured changes that had already been applied after the point-in-time of the recovery.

For each relevant capture process, you can choose either of the following methods to perform point-in-time recovery at a destination database in a Streams environment

Resetting the start SCN for the capture process is simpler than creating a new capture process. However, if the capture process captures changes that are applied at multiple destination databases, then the changes are resent to all the destination databases, including the ones that did not perform point-in-time recovery. If a change is already applied at a destination database, then it is discarded by the apply process, but you may not want to use the network and computer resources required to resend the changes to multiple destination databases. In this case, you can create and temporarily use a new capture process and a new propagation that propagates changes only to the destination database that was recovered.

The following sections provide instructions for each task:

If there are multiple apply processes at the destination database where you performed point-in-time recovery, then complete one of the tasks in this section for each apply process.

Neither of these methods should be used if any of the following conditions are true regarding the destination database you are recovering:

If any of these conditions are true in your environment, then you cannot use the methods described in this section. Instead, you must manually resynchronize the data at all destination databases.

See Also:

Resetting the Start SCN for the Existing Capture Process to Perform Recovery

If you decide to reset the start SCN for the existing capture process to perform point-in-time recovery, then complete the following steps:

  1. If you are not using directed networks between the source database and destination database, then drop the propagation that propagates changes from the source queue at the source database to the destination queue at the destination database. Use the DROP_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to drop the propagation.

    If you are using directed networks, and there are intermediate databases between the source database and destination database, then drop the propagation at each intermediate database in the path to the destination database, including the propagation at the source database.


    Note:

    You must drop the appropriate propagation(s). Disabling them is not sufficient. You will re-create the propagation(s) in Step 6, and dropping them now ensures that only events created after resetting the start SCN for the capture process are propagated.


    See Also:

    "Directed Networks"

  2. Perform the point-in-time recovery at the destination database.
  3. Query for the oldest message number from the source database for the apply process at the destination database. Then, make a note of the results of the query. The oldest message number is the earliest system change number (SCN) that may need to be applied.

    The following statement is an example of the query to perform:

    SELECT APPLY_NAME, OLDEST_MESSAGE_NUMBER FROM DBA_APPLY_PROGRESS;
    
    
  4. Stop the existing capture process using the STOP_CAPTURE procedure in the DBMS_CAPTURE_ADM package.
  5. Reset the start SCN of the existing capture process.

    To reset the start SCN for an existing capture process, run the ALTER_CAPTURE procedure in the DBMS_CAPTURE_ADM package and set the start_scn parameter to the value you recorded from the query in Step 3. For example, to reset the start SCN for a capture process named strm01_capture to the value 829381993, run the following ALTER_CAPTURE procedure:

    BEGIN
      DBMS_CAPTURE_ADM.ALTER_CAPTURE(
        capture_name  =>  'strm01_capture',
        start_scn     =>  829381993);
    END;
    /
    
    
  6. If you are not using directed networks between the source database and destination database, then create a new propagation to propagate changes from the source queue to the destination queue using the CREATE_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package. Specify the rule set used by the original propagation for the rule_set_name parameter when you create the propagation.

    If you are using directed networks, and there are intermediate databases between the source database and destination database, then create a new propagation at each intermediate database in the path to the destination database, including the propagation at the source database.

  7. Start the existing capture process using the START_CAPTURE procedure in the DBMS_CAPTURE_ADM package.

Creating a New Capture Process to Perform Recovery

If you decide to create a new capture process to perform point-in-time recovery, then complete the following steps:

  1. If you are not using directed networks between the source database and destination database, then drop the propagation that propagates changes from the source queue at the source database to the destination queue at the destination database. Use the DROP_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to drop the propagation.

    If you are using directed networks, and there are intermediate databases between the source database and destination database, then drop the propagation that propagates events between the last intermediate database and the destination database. You do not need to drop the propagations at the other intermediate databases nor at the source database.


    Note:

    You must drop the appropriate propagation. Disabling it is not sufficient.


    See Also:

    "Directed Networks"

  2. Perform the point-in-time recovery at the destination database.
  3. Query for the oldest message number from the source database for the apply process at the destination database. Then, make a note of the results of the query. The oldest message number is the earliest system change number (SCN) that may need to be applied.

    The following statement is an example of the query to perform:

    SELECT APPLY_NAME, OLDEST_MESSAGE_NUMBER FROM DBA_APPLY_PROGRESS;
    
    
  4. Create a queue at the source database to be used by the capture process using the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package.

    If you are using directed networks, and there are intermediate databases between the source database and destination database, then create a queue at each intermediate database in the path to the destination database, including the new queue at the source database. Do not create a new queue at the destination database.

  5. If you are not using directed networks between the source database and destination database, then create a new propagation to propagate changes from the source queue created in Step 4 to the destination queue using the CREATE_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package. Specify the rule set used by the original propagation for the rule_set_name parameter when you create the propagation.

    If you are using directed networks, and there are intermediate databases between the source database and destination database, then create a propagation at each intermediate database in the path to the destination database, including the propagation from the source database to the first intermediate database. These propagations propagate changes captured by the capture process you will create in Step 6 between the queues created in Step 4.

  6. Create a new capture process at the source database using the CREATE_CAPTURE procedure in the DBMS_CAPTURE_ADM package. Set the source_queue parameter to the local queue you created in Step 4, the rule_set_name parameter to the rule set used by the original capture process, and the start_scn parameter to the value you recorded from the query in Step 3. If the rule set used by the original capture process captures events that should not be sent to the destination database that was recovered, then you can create and use a smaller, customized rule set that shares some rules with the original rule set.
  7. Start the capture process you created in Step 6 using the START_CAPTURE procedure in the DBMS_CAPTURE_ADM package.
  8. When the oldest message number of the apply process at the recovered database is approaching the capture number of the original capture process at the source database, stop the original capture process using the STOP_CAPTURE procedure in the DBMS_CAPTURE_ADM package.

    At the destination database, you can use the following query to determine the oldest message number from the source database for the apply process:

    SELECT APPLY_NAME, OLDEST_MESSAGE_NUMBER FROM DBA_APPLY_PROGRESS;
    
    

    At the source database, you can use the following query to determine the capture number of the original capture process:

    SELECT CAPTURE_NAME, CAPTURE_MESSAGE_NUMBER FROM V$STREAMS_CAPTURE;
    
    
  9. When the oldest message number of the apply process at the recovered database is beyond the capture number of the original capture process at the source database, drop the new capture process created in Step 6.
  10. If you are not using directed networks between the source database and destination database, then drop the new propagation created in Step 5.

    If you are using directed networks, and there are intermediate databases between the source database and destination database, then drop the new propagation at each intermediate database in the path to the destination database, including the new propagation at the source database.

  11. If you are not using directed networks between the source database and destination database, then remove the queue created in Step 4.

    If you are using directed networks, and there are intermediate databases between the source database and destination database, then drop the new queue at each intermediate database in the path to the destination database, including the new queue at the source database. Do not drop the queue at the destination database.

  12. If you are not using directed networks between the source database and destination database, then create a propagation that propagates changes from the original source queue at the source database to the destination queue at the destination database. Use the CREATE_PROPAGATION procedure in the DBMS_PROPAGATION_ADM package to create the propagation. Specify the rule set used by the original propagation for the rule_set_name parameter when you create the propagation.

    If you are using directed networks, and there are intermediate databases between the source database and destination database, then re-create the propagation from the last intermediate database to the destination database. You dropped this propagation in Step 1.

  13. Start the capture process you stopped in Step 8.

All of the steps after Step 7 can be deferred to a later time, or they can be done as soon as the condition described in Step 8 is met.

Performing Full Database Export/Import on a Database Using Streams

This section describes how to perform a full database export/import on a database that is running one or more Streams capture processes, propagations, or apply processes. These instructions pertain to a full database export/import where the import database and export database are running on different computers, and the import database replaces the export database. The global name of the import database and the global name of the export database must match.


Note:

If you want to add a database to an existing Streams environment, then do not use the instructions in this section. Instead, see "Configuring a Capture-Based Streams Environment".


See Also:

Complete the following steps to perform a full database export/import on a database that is using Streams:

  1. If the export database contains any destination queues for propagations from other databases, then disable each propagation job that propagates events to the export database. You can disable a propagation job using the DISABLE_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.
  2. Make the necessary changes to your network configuration so that the database links used by the propagation jobs you disabled in Step 1 point to the computer running the import database.

    To complete this step, you may need to re-create the database links used by these propagation jobs or modify your Oracle networking files at the databases that contain the source queues.

  3. Notify all users to stop making data manipulation language (DML) and data definition language (DDL) changes to the export database, and wait until these changes have stopped.
  4. Make a note of the current export database system change number (SCN). You can determine the current SCN using the GET_SYSTEM_CHANGE_NUMBER function in the DBMS_FLASHBACK package.

    After completing this step, do not stop any capture process running on the export database. Step 10 instructs you to use the V$STREAMS_CAPTURE dynamic performance view to ensure that no DML or DDL changes were made to the database after Step 3. The information about a capture process in this view is reset if the capture process is stopped and restarted.

    For the check in Step 10 to be valid, this information should not be reset for any capture process. To prevent a capture process from stopping automatically, you may need to set the message_limit and time_limit capture process parameters to infinite if these parameters are set to another value for any capture process.

  5. If the export database is not running any apply processes, and is not propagating user-enqueued events, then start the full database export now. Make sure that the FULL export parameter is set to y so that the required Streams metadata is exported.

    If the export database is running one or more apply processes or is propagating user-enqueued events, then do not start the export and proceed to the next step.

  6. If the export database is running one or more capture processes, then wait until the applied SCN of each capture process has reached or exceeded the SCN determined in Step 4.

    You can view the applied SCN for each capture process by querying the APPLIED_SCN column in the DBA_CAPTURE data dictionary view.

  7. If the export database has any propagation jobs that are propagating user-enqueued events, then disable these propagation jobs using the DISABLE_PROPAGATION_SCHEDULE procedure in the DBMS_AQADM package.
  8. If the export database is running one or more apply processes, or is propagating user-enqueued events, then start the full database export now. Make sure that the FULL export parameter is set to y so that the required Streams metadata is exported. If you already started the export in Step 5, then proceed to Step 9.
  9. When the export is complete, transfer the export dump file to the computer running the import database.
  10. If the export database is running one or more capture processes, then ensure that all DML and DDL changes on the export database were stopped before the SCN determined in Step 4 by completing the following steps:
    1. Get the current SCN using the GET_SYSTEM_CHANGE_NUMBER function in the DBMS_FLASHBACK package. This SCN will be called the new SCN.
    2. Wait until the capture message number of each capture process has reached or exceeded the new SCN determined in Step a. You can view the capture message number for each capture process by querying the CAPTURE_MESSAGE_NUMBER column in the V$STREAMS_CAPTURE dynamic performance view.
    3. Verify that the enqueue message number of each capture process is less than or equal to the SCN determined in Step 4. You can view the enqueue message number for each capture process by querying the ENQUEUE_MESSAGE_NUMBER column in the V$STREAMS_CAPTURE dynamic performance view.

      If the enqueue message number of each capture process is less than or equal to the SCN determined in Step 4, then proceed to Step 11.

      However, if the enqueue message number of any capture process is higher than the SCN determined in Step 4, then one or more DML or DDL changes were made after the SCN determined in Step 4, and these changes were captured and enqueued by a capture process. In this case, perform all of the steps in this section again, starting with Step 1.


      Note:

      For this verification to be valid, each capture process must have been running uninterrupted since Step 4.


  11. Perform the full database import. Make sure that the STREAMS_CONFIGURATION and FULL import parameters are both set to y so that the required Streams metadata is imported. The default setting is y for the STREAMS_CONFIGURATION import parameter. Also, make sure no DML or DDL changes are made to the import database during the import.
  12. Let users access the import database, and shut down the export database.
  13. Enable any propagation jobs you disabled in Steps 1 and 7.

If you reset the value of a message_limit or time_limit capture process parameter in Step 4, then reset these parameters to their original settings.