I am implementing an Oracle Streams environment that replicates DML changes on some tables.
I used the procedure DBMS_STREAMS_ADM.MAINTAIN_TABLES with the instantiation as DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK, all the blocks of the procedure executed successfully, however the tables were not created at the destination database.
As stated in the docs:
If this parameter is set to DBMS_STREAMS_ADM.INSTANTIATION_TABLE or DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK, then the tables being instantiated must exist at the source database, and the tablespaces that contain the tables must exist at the destination database.
Details:
A database link from destination database to source database exists.
The tables exist in the source database and also the tablespace exist in the destination database.
The user SCOTT does not exist in the destination database.
Source database is Oracle Database 11g Enterprise Edition Release 11.2.0.3.0 - 64bit Production
Destination database is Oracle Database 11g Express Edition Release 11.2.0.2.0 - Production
Main Code:
DECLARE
tables DBMS_UTILITY.UNCL_ARRAY;
i number := 1;
j number := 1;
iscn NUMBER;
BEGIN
FOR c IN (select table_name from config_tables) LOOP
tables(i) := c.table_name;
i := i + 1;
END LOOP;
DBMS_STREAMS_ADM.MAINTAIN_TABLES(table_names => tables,
source_directory_object => NULL,
destination_directory_object => NULL,
source_database => 'db1_global_name',
destination_database => 'db2_global_name',
perform_actions => TRUE,
capture_name => 'DML_Capture_Src',
capture_queue_name => 'strmadmin.dml_streams_queue',
propagation_name => 'DML_Propagation',
apply_name => 'DML_Apply_Dest',
apply_queue_name => 'strmadmin.dml_streams_queue',
bi_directional => FALSE,
include_ddl => FALSE,
instantiation => DBMS_STREAMS_ADM.INSTANTIATION_TABLE_NETWORK);
END;
/
The code generated 14 blocks, the 7th block (which uses datapump)
--
-- Datapump TABLE MODE IMPORT (NETWORK)
--
DECLARE
h1 NUMBER := NULL; -- data pump job handle
name_expr_list VARCHAR2(32767); -- for metadata_filter
object_name dbms_utility.uncl_array; -- object names
cnt NUMBER;
object_owner VARCHAR2(30); -- owner
job_state VARCHAR2(30); -- job state
status ku$_Status; -- data pump status
job_not_exist exception;
pragma exception_init(job_not_exist, -31626);
local_compat v$parameter.value%TYPE;
remote_compat v$parameter.value%TYPE;
min_compat v$parameter.value%TYPE;
ind NUMBER;
le ku$_LogEntry; -- For WIP and error messages
js ku$_JobStatus; -- The job status from get_status
jd ku$_JobDesc;
-- The job description from get_status
BEGIN
object_name(1) := 'DEPT';
object_name(2) := 'SALGRADE';
object_owner := 'SCOTT';
FOR idx IN 1..2 LOOP
SELECT COUNT(1) INTO cnt FROM all_tables@"db2_global_name"
WHERE owner = object_owner AND table_name = object_name(idx);
-- table does not exist locally, need instantiation
IF cnt = 0 THEN
IF name_expr_list IS NULL THEN
name_expr_list := '(';
ELSE
name_expr_list := name_expr_list ||',';
END IF;
name_expr_list := name_expr_list||''''||object_name(idx)||'''';
END IF;
END LOOP;
IF name_expr_list IS NOT NULL THEN
name_expr_list := name_expr_list || ')';
ELSE
COMMIT;
RETURN;
END IF;
select value into local_compat from v$parameter@"db1_global_name"
where name = 'compatible';
select value into remote_compat from v$parameter@"db2_global_name"
where name = 'compatible';
IF TO_NUMBER(REPLACE(local_compat, '.', '0')) > TO_NUMBER(REPLACE(remote_compat, '.', '0')) THEN
min_compat := remote_compat;
ELSE
min_compat := local_compat;
END IF;
h1 := dbms_datapump.open(operation=>'IMPORT',job_mode=>'TABLE',
remote_link=>'db1_global_name',
job_name=>NULL, version=> min_compat);
dbms_datapump.metadata_filter(
handle=>h1,
name=>'NAME_EXPR',
value=>'IN'||name_expr_list);
dbms_datapump.metadata_filter(
handle=>h1,
name=>'SCHEMA_EXPR',
value=>'IN'||'(''SCOTT'')');
dbms_datapump.start_job(h1);
job_state := 'UNDEFINED';
BEGIN
WHILE (job_state != 'COMPLETED') AND (job_state != 'STOPPED') LOOP
status := dbms_datapump.get_status(
handle => h1,
mask => dbms_datapump.ku$_status_job_error +
dbms_datapump.ku$_status_job_status +
dbms_datapump.ku$_status_wip,
timeout => -1);
job_state := status.job_status.state;
dbms_lock.sleep(10);
END LOOP;
EXCEPTION WHEN job_not_exist THEN
dbms_output.put_line('job finished');
END;
COMMIT;
EXCEPTION WHEN OTHERS THEN
dbms_output.put_line('Exception, sql code = ' || SQLCODE || ', error message: ' || SQLERRM);
dbms_output.put_line( dbms_utility.format_error_stack);
dbms_output.put_line( dbms_utility.format_error_backtrace);
IF h1 IS NOT NULL THEN
BEGIN
dbms_datapump.get_status(
handle => h1,
mask => dbms_datapump.ku$_status_job_error +
dbms_datapump.ku$_status_job_status +
dbms_datapump.ku$_status_wip,
timeout => -1,
job_state => job_state,
status => status );
dbms_output.put_line('Data pump job status: ' || job_state);
le := status.wip;
IF le IS NULL THEN
dbms_output.put_line('WIP info is NULL');
ELSE
dbms_output.put_line('WIP info:');
ind := le.FIRST;
WHILE ind IS NOT NULL LOOP
dbms_output.put_line(le(ind).LogText);
ind := le.NEXT(ind);
END LOOP;
END IF;
le := status.error;
IF le IS NULL THEN
dbms_output.put_line('Error info is NULL');
ELSE
dbms_output.put_line('Error info:');
ind := le.FIRST;
WHILE ind IS NOT NULL LOOP
dbms_output.put_line(le(ind).LogText);
ind := le.NEXT(ind);
END LOOP;
END IF;
EXCEPTION
WHEN job_not_exist THEN
dbms_output.put_line('Data pump job finished');
WHEN OTHERS THEN RAISE;
END;
END IF;
ROLLBACK;
RAISE;
END;