Why tables are not created at destination database when using DMBS_STREAMS_ADM.MAINATIAN_TABLES procedure?

249 Views Asked by At

I am implementing an Oracle Streams environment that replicates DML changes on some tables.

I used the procedure DBMS_STREAMS_ADM.MAINTAIN_TABLES with the instantiation as DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK, all the blocks of the procedure executed successfully, however the tables were not created at the destination database.

As stated in the docs:

If this parameter is set to DBMS_STREAMS_ADM.INSTANTIATION_TABLE or DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK, then the tables being instantiated must exist at the source database, and the tablespaces that contain the tables must exist at the destination database.

Details:

  1. A database link from destination database to source database exists.

  2. The tables exist in the source database and also the tablespace exist in the destination database.

  3. The user SCOTT does not exist in the destination database.

  4. Source database is Oracle Database 11g Enterprise Edition Release 11.2.0.3.0 - 64bit Production

  5. Destination database is Oracle Database 11g Express Edition Release 11.2.0.2.0 - Production

Main Code:

DECLARE
  tables DBMS_UTILITY.UNCL_ARRAY;
  i      number := 1;
  j      number := 1;
  iscn  NUMBER;

BEGIN

  FOR c IN (select table_name from config_tables) LOOP
    tables(i) := c.table_name;
    i := i + 1;
  END LOOP;

  DBMS_STREAMS_ADM.MAINTAIN_TABLES(table_names                  => tables,
                                   source_directory_object      => NULL,
                                   destination_directory_object => NULL,
                                   source_database              => 'db1_global_name',
                                   destination_database         => 'db2_global_name',
                                   perform_actions              => TRUE,
                                   capture_name       => 'DML_Capture_Src',
                                   capture_queue_name => 'strmadmin.dml_streams_queue',
                                   propagation_name   => 'DML_Propagation',
                                   apply_name         => 'DML_Apply_Dest',
                                   apply_queue_name   => 'strmadmin.dml_streams_queue',

                                   bi_directional => FALSE,
                                   include_ddl    => FALSE,
                                   instantiation  => DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK);
END;
/

The code generated 14 blocks, the 7th block (which uses datapump)

-- 
-- Datapump TABLE MODE IMPORT (NETWORK)
-- 
DECLARE
  h1                NUMBER := NULL; -- data pump job handle
  name_expr_list    VARCHAR2(32767); -- for metadata_filter
  object_name       dbms_utility.uncl_array; -- object names
  cnt               NUMBER;
  object_owner      VARCHAR2(30); -- owner
  job_state         VARCHAR2(30); -- job state
  status            ku$_Status; -- data pump status
  job_not_exist     exception;
  pragma            exception_init(job_not_exist, -31626);
  local_compat            v$parameter.value%TYPE;
  remote_compat            v$parameter.value%TYPE;
  min_compat            v$parameter.value%TYPE;
  ind               NUMBER; 
  le                ku$_LogEntry;  -- For WIP and error messages
  js                ku$_JobStatus; -- The job status from get_status 
  jd                ku$_JobDesc; 
-- The job description from get_status
BEGIN

  object_name(1) := 'DEPT';
  object_name(2) := 'SALGRADE';
  object_owner := 'SCOTT';
  FOR idx IN 1..2 LOOP
    SELECT COUNT(1) INTO cnt FROM all_tables@"db2_global_name"
      WHERE owner = object_owner AND table_name = object_name(idx);
    -- table does not exist locally, need instantiation
    IF cnt = 0 THEN
      IF name_expr_list IS NULL THEN
        name_expr_list := '(';
      ELSE
        name_expr_list := name_expr_list ||',';
      END IF;
      name_expr_list := name_expr_list||''''||object_name(idx)||'''';
    END IF;
  END LOOP;
  IF name_expr_list IS NOT NULL THEN
    name_expr_list := name_expr_list || ')';
  ELSE
    COMMIT;
    RETURN;
  END IF;

  select value into local_compat from v$parameter@"db1_global_name"
  where name = 'compatible';
  select value into remote_compat from v$parameter@"db2_global_name"
  where name = 'compatible';
  IF TO_NUMBER(REPLACE(local_compat, '.', '0')) > TO_NUMBER(REPLACE(remote_compat, '.', '0')) THEN
    min_compat := remote_compat;
  ELSE
    min_compat := local_compat;
  END IF;
  h1 := dbms_datapump.open(operation=>'IMPORT',job_mode=>'TABLE',
    remote_link=>'db1_global_name',
    job_name=>NULL, version=> min_compat);



  dbms_datapump.metadata_filter(
    handle=>h1,
    name=>'NAME_EXPR',
    value=>'IN'||name_expr_list);
  dbms_datapump.metadata_filter(
    handle=>h1,
    name=>'SCHEMA_EXPR',
    value=>'IN'||'(''SCOTT'')');

  dbms_datapump.start_job(h1);

  job_state := 'UNDEFINED';
  BEGIN
    WHILE (job_state != 'COMPLETED') AND (job_state != 'STOPPED') LOOP
      status := dbms_datapump.get_status(
        handle => h1,
        mask => dbms_datapump.ku$_status_job_error +
                dbms_datapump.ku$_status_job_status +
                dbms_datapump.ku$_status_wip,
        timeout => -1);
      job_state := status.job_status.state;
      dbms_lock.sleep(10);
    END LOOP;
  EXCEPTION WHEN job_not_exist THEN
    dbms_output.put_line('job finished');
  END;
  COMMIT;
EXCEPTION WHEN OTHERS THEN
  dbms_output.put_line('Exception, sql code = ' || SQLCODE || ', error message: ' || SQLERRM);
  dbms_output.put_line( dbms_utility.format_error_stack);
  dbms_output.put_line( dbms_utility.format_error_backtrace);

  IF h1 IS NOT NULL THEN 
    BEGIN 
      dbms_datapump.get_status( 
        handle    => h1, 
        mask      => dbms_datapump.ku$_status_job_error + 
                     dbms_datapump.ku$_status_job_status +
                     dbms_datapump.ku$_status_wip, 
        timeout   => -1, 
        job_state => job_state,
        status     => status );

      dbms_output.put_line('Data pump job status: ' || job_state);
      le := status.wip; 
      IF le IS NULL THEN 
        dbms_output.put_line('WIP info is NULL'); 
      ELSE 
        dbms_output.put_line('WIP info:'); 
        ind := le.FIRST; 
        WHILE ind IS NOT NULL LOOP 
          dbms_output.put_line(le(ind).LogText); 
          ind := le.NEXT(ind); 
        END LOOP; 
      END IF; 

      le := status.error; 
      IF le IS NULL THEN 
        dbms_output.put_line('Error info is NULL'); 
      ELSE 
        dbms_output.put_line('Error info:'); 
        ind := le.FIRST; 
        WHILE ind IS NOT NULL LOOP 
          dbms_output.put_line(le(ind).LogText); 
          ind := le.NEXT(ind); 
        END LOOP; 
      END IF; 
    EXCEPTION  
      WHEN job_not_exist THEN 
        dbms_output.put_line('Data pump job finished');
      WHEN OTHERS THEN RAISE; 
    END; 
  END IF; 
  ROLLBACK;
  RAISE;
END;
0

There are 0 best solutions below