| Oracle® Streams Extended Examples 11g Release 2 (11.2) E12862-03 | 
 | 
| 
 | Mobi · ePub | 
This chapter illustrates an example that creates a PL/SQL procedure for constructing and enqueuing LCRs that contain LOBs.
This chapter contains this topic:
Grant the Oracle Streams Administrator EXECUTE Privilege on DBMS_STREAMS_MESSAGING
Grant the Oracle Streams Administrator Necessary Privileges on the Tables
Note:
If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line after this note to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to all of the databases in the environment./************************* BEGINNING OF SCRIPT ******************************
Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL lob_construct.out /*
Explicit EXECUTE privilege on the package is required because a procedure in the package is called in within a PL/SQL procedure in Step 8.
*/ CONNECT / AS SYSDBA; GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO strmadmin; /*
*/
SET ECHO ON SET FEEDBACK 1 SET NUMWIDTH 10 SET LINESIZE 80 SET TRIMSPOOL ON SET TAB OFF SET PAGESIZE 100 SET SERVEROUTPUT ON SIZE 100000 CONNECT strmadmin /*
*/
BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE( 
    queue_table => 'lobex_queue_table', 
    queue_name  => 'lobex_queue');
END;
/
/*
*/
BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    queue_name      => 'strmadmin.lobex_queue',
    apply_name      => 'apply_lob',
    apply_captured  => FALSE);
END;
/
BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name => 'apply_lob', 
    parameter  => 'disable_on_error',
    value      => 'N');
END;
/
BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    'apply_lob');
END;
/
/*
*/
CONNECT system
CREATE TABLESPACE lob_user_tbs DATAFILE 'lob_user_tbs.dbf' 
  SIZE 5M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
ACCEPT password PROMPT 'Enter password for user: ' HIDE
CREATE USER lob_user
IDENTIFIED BY &password
  DEFAULT TABLESPACE lob_user_tbs
  QUOTA UNLIMITED ON lob_user_tbs;
GRANT ALTER SESSION, CREATE CLUSTER, CREATE DATABASE LINK, CREATE SEQUENCE,
  CREATE SESSION, CREATE SYNONYM, CREATE TABLE, CREATE VIEW, CREATE INDEXTYPE, 
  CREATE OPERATOR, CREATE PROCEDURE, CREATE TRIGGER, CREATE TYPE
TO lob_user;
CONNECT lob_user/lob_user_pw
CREATE TABLE with_clob (a  NUMBER PRIMARY KEY,
                        c1 CLOB,
                        c2 CLOB,
                        c3 CLOB);
CREATE TABLE with_blob (a NUMBER PRIMARY KEY,
                        b BLOB);
/*
Granting these privileges enables the Oracle Streams administrator to get the LOB length for offset and to perform DML operations on the tables.
*/ GRANT ALL ON with_clob TO strmadmin; GRANT ALL ON with_blob TO strmadmin; COMMIT; /*
*/
CONNECT strmadmin
CREATE OR REPLACE PROCEDURE enq_row_lcr(source_dbname  VARCHAR2,
                                            cmd_type       VARCHAR2,
                                            obj_owner      VARCHAR2,
                                            obj_name       VARCHAR2,
                                            old_vals       SYS.LCR$_ROW_LIST,
                                            new_vals       SYS.LCR$_ROW_LIST) AS
  xr_lcr         SYS.LCR$_ROW_RECORD;
BEGIN
  xr_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
              source_database_name => source_dbname,
              command_type         => cmd_type,
              object_owner         => obj_owner,
              object_name          => obj_name,
              old_values           => old_vals,
              new_values           => new_vals);
  -- Enqueue a row lcr
  DBMS_STREAMS_MESSAGING.ENQUEUE(
        queue_name         => 'lobex_queue', 
        payload            => ANYDATA.ConvertObject(xr_lcr));
END enq_row_lcr;
/
SHOW ERRORS
/*
*/
-- Description of each variable:
-- src_dbname  : Source database name
-- tab_owner   : Table owner
-- tab_name    : Table name
-- col_name    : Name of the CLOB column
-- new_vals    : SYS.LCR$_ROW_LIST containing primary key and supplementally  
--               logged colums
-- clob_data   : CLOB that contains data to be sent
-- offset      : Offset from which data should be sent, default is 1
-- lsize       : Size of data to be sent, default is 0
-- chunk_size  : Size used for creating LOB chunks, default is 2048
CREATE OR REPLACE FUNCTION do_enq_clob(src_dbname     VARCHAR2,
                                       tab_owner      VARCHAR2,
                                       tab_name       VARCHAR2,
                                       col_name       VARCHAR2,
                                       new_vals       SYS.LCR$_ROW_LIST,
                                       clob_data      CLOB,
                                       offset         NUMBER default 1,
                                       lsize          NUMBER default 0,
                                       chunk_size     NUMBER default 2048) 
RETURN NUMBER IS
  lob_offset NUMBER; -- maintain lob offset
  newunit    SYS.LCR$_ROW_UNIT;
  tnewvals   SYS.LCR$_ROW_LIST;
  lob_flag   NUMBER;
  lob_data   VARCHAR2(32767);
  lob_size   NUMBER;
  unit_pos   NUMBER;
  final_size NUMBER;
  exit_flg   BOOLEAN;
  c_size     NUMBER;
  i          NUMBER;
BEGIN
  lob_size := DBMS_LOB.GETLENGTH(clob_data);
  unit_pos := new_vals.count + 1;
  tnewvals := new_vals;
  c_size   := chunk_size;
  i := 0;
  -- validate parameters
  IF (unit_pos <= 1) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid new_vals list');
    RETURN 1;
  END IF;
  IF (c_size < 1) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid LOB chunk size');
    RETURN 1;
  END IF;
  IF (lsize < 0 OR lsize > lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid LOB size');
    RETURN 1;
  END IF;
  IF (offset < 1 OR offset >= lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid lob offset');
    RETURN 1;
  ELSE
    lob_offset := offset;
  END IF;
  -- calculate final size
  IF (lsize = 0) THEN
    final_size := lob_size;
  ELSE
    final_size := lob_offset + lsize;
  END IF;
  --  The following output lines are for debugging purposes only.
  -- DBMS_OUTPUT.PUT_LINE('Final size: ' || final_size);
  -- DBMS_OUTPUT.PUT_LINE('Lob size: ' || lob_size);
  IF (final_size < 1 OR final_size > lob_size) THEN
    DBMS_OUTPUT.PUT_LINE('Invalid lob size');
    RETURN 1;
  END IF;
  -- expand new_vals list for LOB column
  tnewvals.extend();
  exit_flg := FALSE;
  -- Enqueue all LOB chunks
  LOOP
    --  The following output line is for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('About to write chunk#' || i);
    i := i + 1;
 
    -- check if last LOB chunk
    IF ((lob_offset + c_size) < final_size) THEN
      lob_flag := DBMS_LCR.LOB_CHUNK;
    ELSE
      lob_flag := DBMS_LCR.LAST_LOB_CHUNK;
      exit_flg := TRUE;
      --  The following output line is for debugging purposes only.
      DBMS_OUTPUT.PUT_LINE('Last LOB chunk');
    END IF;
    --  The following output lines are for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('lob offset: ' || lob_offset);
    DBMS_OUTPUT.PUT_LINE('Chunk size: ' || to_char(c_size));
    lob_data := DBMS_LOB.SUBSTR(clob_data, c_size, lob_offset); 
    -- create row unit for clob
    newunit := SYS.LCR$_ROW_UNIT(col_name,
                                 ANYDATA.ConvertVarChar2(lob_data), 
                                 lob_flag, 
                                 lob_offset, 
                                 NULL);
    -- insert new LCR$_ROW_UNIT
    tnewvals(unit_pos) := newunit;  
    -- enqueue lcr
    enq_row_lcr(
          source_dbname => src_dbname,
          cmd_type      => 'LOB WRITE',
          obj_owner     => tab_owner,
          obj_name      => tab_name,
          old_vals      => NULL,
          new_vals      => tnewvals);
    -- calculate next chunk size 
    lob_offset := lob_offset + c_size;
    
    IF ((final_size - lob_offset) < c_size) THEN
      c_size := final_size - lob_offset + 1;
    END IF;
    --  The following output line is for debugging purposes only.
    DBMS_OUTPUT.PUT_LINE('Next chunk size : ' || TO_CHAR(c_size));
    IF (c_size < 1) THEN
      exit_flg := TRUE;
    END IF;
    EXIT WHEN exit_flg;
  END LOOP;
  RETURN 0;
END do_enq_clob;
/
SHOW ERRORS
/*
The DBMS_OUTPUT lines in the following example can be used for debugging purposes if necessary. If they are not needed, then they can be commented out or deleted.
*/
SET SERVEROUTPUT ON SIZE 100000
DECLARE
  c1_data CLOB;
  c2_data CLOB;
  c3_data CLOB;
  newunit1 SYS.LCR$_ROW_UNIT;
  newunit2 SYS.LCR$_ROW_UNIT;
  newunit3 SYS.LCR$_ROW_UNIT;
  newunit4 SYS.LCR$_ROW_UNIT;
  newvals  SYS.LCR$_ROW_LIST;
  big_data VARCHAR(22000);
  n        NUMBER;
BEGIN
  -- Create primary key for LCR$_ROW_UNIT
  newunit1 := SYS.LCR$_ROW_UNIT('A',
                                ANYDATA.ConvertNumber(3), 
                                NULL, 
                                NULL, 
                                NULL);
  -- Create empty CLOBs
  newunit2 := sys.lcr$_row_unit('C1',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newunit3 := SYS.LCR$_ROW_UNIT('C2',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newunit4 := SYS.LCR$_ROW_UNIT('C3',
                                ANYDATA.ConvertVarChar2(NULL),
                                DBMS_LCR.EMPTY_LOB, 
                                NULL, 
                                NULL);
  newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4);
  -- Perform an insert
  enq_row_lcr(
    source_dbname => 'MYDB.EXAMPLE.COM',
    cmd_type      => 'INSERT',
    obj_owner     => 'LOB_USER',
    obj_name      => 'WITH_CLOB',
    old_vals      => NULL,
    new_vals      => newvals);
  -- construct clobs
  big_data := RPAD('Hello World', 1000, '_');
  big_data := big_data || '#';
  big_data := big_data || big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c1_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c1_data, 
    amount  => length(big_data), 
    buffer  => big_data);
  big_data := RPAD('1234567890#', 1000, '_');
  big_data := big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c2_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c2_data, 
    amount  => length(big_data), 
    buffer  => big_data);
  big_data := RPAD('ASDFGHJKLQW', 2000, '_');
  big_data := big_data || '#';
  big_data := big_data || big_data || big_data || big_data || big_data;
  DBMS_LOB.CREATETEMPORARY(
    lob_loc => c3_data, 
    cache   => TRUE);
  DBMS_LOB.WRITEAPPEND(
    lob_loc => c3_data, 
    amount  => length(big_data), 
    buffer  => big_data);
  -- pk info
  newunit1 := SYS.LCR$_ROW_UNIT('A',
                                ANYDATA.ConvertNumber(3), 
                                NULL, 
                                NULL, 
                                NULL);
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  -- write c1 clob
  n := do_enq_clob(
         src_dbname => 'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C1',
         new_vals   => newvals,
         clob_data  => c1_data,
         offset     => 1,
         chunk_size => 1024);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  -- write c2 clob
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  n := do_enq_clob(
         src_dbname => 'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C2',
         new_vals   => newvals,
         clob_data  => c2_data,
         offset     => 1,
         chunk_size => 2000);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  -- write c3 clob
  newvals  := SYS.LCR$_ROW_LIST(newunit1); 
  n := do_enq_clob(src_dbname=>'MYDB.EXAMPLE.COM',
         tab_owner  => 'LOB_USER',
         tab_name   => 'WITH_CLOB',
         col_name   => 'C3',
         new_vals   => newvals,
         clob_data  => c3_data,
         offset     => 1,
         chunk_size => 500);
  DBMS_OUTPUT.PUT_LINE('n=' || n);
 
  COMMIT;
END;
/
/*
Check the lob_construct.out spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
After you run the script, you can check the lob_user.with_clob table to list the rows applied by the apply process. The DBMS_LOCK.SLEEP statement is used to give the apply process time to apply the enqueued rows.
CONNECT lob_user/lob_user_pw EXECUTE DBMS_LOCK.SLEEP(10); SELECT a, c1, c2, c3 FROM with_clob ORDER BY a; SELECT a, LENGTH(c1), LENGTH(c2), LENGTH(c3) FROM with_clob ORDER BY a;