Views expressed here are solely that of my own. Please make sure that you test the code/queries that appear on my blog before applying them to the production environment.

Tuesday, September 27, 2011

How to configure Oracle Streams for table replication

Below post is a sample case which explains how to configure Oracle Streams for table replication in an Oracle 10gR2 database. It will use asynchronous downstream capture method when replicating a table from one database to another database. This means archive log files generated in the source database will be implicitly copied to the target database asynchronously and they will be processed in the target database to capture the changes related to the replicated table and they will be applied there.

First you need to create a new user who is the Streams administrator in both source and target databases. You also create a separate tablespace for this new user's objects.

Below command should be executed in both the source and the target databases.

To create a new tablespace
If you are using ASM with OMF(Oracle Managed Files) and ASSM (Automatic Segment Space Management), use the below statement.
create tablespace streams_tbs;

if not, use the below statement.
CREATE TABLESPACE streams_tbs DATAFILE '/usr/oracle/dbs/streams_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

Ceate the Streams admin user
CREATE USER strmadmin
IDENTIFIED BY "strmadminpwd"
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;
GRANT DBA TO strmadmin;

The source database should be in archivelog mode to generate files containing the change records happened in this database.

You can check with the following sql whether the source database is in archivelog mode or not.

--@db1
select log_mode from v$database;
--ARCHIVELOG

Next you should create an ANYDATA queue in the target database to associate with the capture process.
If the specified queue table does not exist, then it is created. If the specified queue table exists, then the existing queue table is used for the new queue. If you do not specify any queue table when you create the queue, then, by default, streams_queue_table is specified.
You can use a single procedure, the SET_UP_QUEUE procedure in the DBMS_STREAMS_ADM package, to create an ANYDATA queue and the queue table used by the queue. For SET_UP_QUEUE to create a new queue table, the specified queue table must
not exist.

--strmadmin@db2
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue',
queue_user => 'd_dba');
END;
/

"queue_user" parameter is the user who is privileged to use this queue, in our case it is "d_dba" database user which we will replicate a table of this user.

You can use the following sqls to check the newly created queue.
select * from dba_queue_tables where owner='STRMADMIN'; 
/*
OWNER,QUEUE_TABLE,TYPE,OBJECT_TYPE,SORT_ORDER,RECIPIENTS,MESSAGE_GROUPING,COMPATIBLE,PRIMARY_INSTANCE,SECONDARY_INSTANCE,OWNER_INSTANCE,USER_COMMENT,SECURE
STRMADMIN,STREAMS_QUEUE_TABLE,OBJECT,SYS.ANYDATA,COMMIT_TIME,MULTIPLE,TRANSACTIONAL,10.0.0,0,0,1,,YES
*/
select * from dba_queues where owner='STRMADMIN'; 
/*
OWNER,NAME,QUEUE_TABLE,QID,QUEUE_TYPE,MAX_RETRIES,RETRY_DELAY,ENQUEUE_ENABLED,DEQUEUE_ENABLED,RETENTION,USER_COMMENT,NETWORK_NAME
STRMADMIN,STREAMS_QUEUE,STREAMS_QUEUE_TABLE,133367,NORMAL_QUEUE,5,0, YES , YES ,0,,
STRMADMIN,AQ$_STREAMS_QUEUE_TABLE_E,STREAMS_QUEUE_TABLE,133366,EXCEPTION_QUEUE,0,0, NO , NO ,0,exception queue,
*/
select * from dba_rulesets where owner='STRMADMIN'; 
/*
OWNER,RULESET_NAME,RULESET_STORAGE_TABLE,BASE_TABLE,RULESET_COMMENT
STRMADMIN,STREAMS_QUEUE_N,,STRMADMIN.AQ$_STREAMS_QUEUE_TABLE_V,
STRMADMIN,STREAMS_QUEUE_R,,STRMADMIN.AQ$_STREAMS_QUEUE_TABLE_V,
*/

We should add the tns alias of the target database to the "tnsnames.ora" file in source database. So that we will use this name for implicit archivelog file transfer to the target database.

--@db1srv
vi $TNS_ADMIN/tnsnames.ora

--Add these lines
### STREAMS CONFIG ###
DB2 =
(DESCRIPTION=
(ADDRESS=
(PROTOCOL=TCP)
(HOST=DB2SRV)
(PORT=1521)
)
(CONNECT_DATA=
(SERVICE_NAME=DB2)
)
)
### END STREAMS CONFIG ###

In the target database's ASM create the folder structure where the newly created archivelogs will be copied.
--@db2srv
asmcmp -p
ASMCMD [+DG_DB_ASM_DB2/DB2] > mkdir ARCHIVELOG
ASMCMD [+DG_DB_ASM_DB2/DB2] > cd ARCHIVELOG
ASMCMD [+DG_DB_ASM_DB2/DB2/ARCHIVELOG] > mkdir FROM_DB1
ASMCMD [+DG_DB_ASM_DB2/DB2/ARCHIVELOG] > cd FROM_DB1
ASMCMD [+DG_DB_ASM_DB2/DB2/ARCHIVELOG/FROM_DB1] > ls -l
ASMCMD [+DG_DB_ASM_DB2/DB2/ARCHIVELOG/FROM_DB1] > 

In the source database configure the log_archive_dest_2 parameter and enable it so that it will point to and send the generated archivelogs to the destination database.
alter system set LOG_ARCHIVE_DEST_2='SERVICE=DB2 ASYNC NOREGISTER 
VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) 
TEMPLATE=+DG_DB_ASM_DB2/DB2/ARCHIVELOG/FROM_DB1/%t_%s_%r.dbf 
DB_UNIQUE_NAME=DB2'
scope=both;
alter system set LOG_ARCHIVE_DEST_STATE_2='ENABLE';

Try to generate a new archive in source database and check if it is copied to the destination database ASM or not.

--@db1
alter system switch logfile;

--@db2srv
asmcmp -p
ASMCMD [+DG_DB_ASM_DB2/DB2/ARCHIVELOG/FROM_DB1] > ls -l
Type Redund Striped Time Sys Name
N 1_200291_699577217.dbf => +DG_DB_ASM_DB2/DB2/ARCHIVELOG/2011_09_22thread_1_seq_200291.296.762521353

We should add the tns alias of the source database to the "tnsnames.ora" file in target database. So that we will use this name for implicit archivelog file transfer to the target database.

--@db2srv

vi $TNS_ADMIN/tnsnames.ora

--Add these lines
### STREAMS CONFIG ###
DB1 =
(DESCRIPTION=
(ADDRESS=
(PROTOCOL=TCP)
(HOST=DB1SRV)
(PORT=1521)
)
(CONNECT_DATA=
(SERVICE_NAME=DB1)
)
)
### END STREAMS CONFIG ###

--test this tns alias if it is reachable or not
--@db2srv
tnsping DB1
TNS Ping Utility for IBM/AIX RISC System/6000: Version 10.2.0.4.0 - Production on 22-SEP-2011 15:48:03
Copyright (c) 1997, 2007, Oracle. All rights reserved.
Used parameter files:
/oracle/asmhome1/network/admin/sqlnet.ora
Used TNSNAMES adapter to resolve the alias
Attempting to contact (DESCRIPTION= (ADDRESS= (PROTOCOL=TCP) (HOST=DB1SRV) (PORT=1521)) (CONNECT_DATA= (SERVICE_NAME=DB1)))
OK (0 msec)

Create a database link in strmadmin schema in target database, we will use this later.
--strmadmin@db2
CREATE DATABASE LINK db1
CONNECT TO strmadmin
IDENTIFIED BY "strmadminpwd"
USING 'DB1';

--Test it if it works or not
select sysdate from dual@db1;
--OK

While connected to the downstream database as the Streams administrator, run
the CREATE_CAPTURE procedure to create the capture process:
BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'strm04_capture',
rule_set_name => NULL,
start_scn => NULL,
source_database => 'db1',
use_database_link => true,
first_scn => NULL,
logfile_assignment => 'implicit');
END;
/
SELECT CAPTURE_NAME, FIRST_SCN, MAX_CHECKPOINT_SCN FROM DBA_CAPTURE;
/*
CAPTURE_NAME,FIRST_SCN,MAX_CHECKPOINT_SCN
STRM04_CAPTURE,74313112953,0
*/

We created a capture process called "strm04_capture".

This step does not associate the capture process strm04_capture with any rule
set. A rule set will be created and associated with the capture process in the next
step.
First we need to create the table in source database which will be replicated to the destination database.
--sys@db1
create table d_dba.streams_test_uural(col1 number, col2 varchar2(100), a_date date);
insert into d_dba.streams_test_uural(col1, col2, a_date) values (1,'1',sysdate);
commit;

While connected to the downstream database as the Streams administrator, create
the positive rule set for the capture process and add a rule to it:
--strmadmin@db2
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'd_dba.streams_test_uural',
streams_type => 'capture',
streams_name => 'strm04_capture',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => 'db1',
inclusion_rule => true);
END;
/
select * from dba_rules where rule_owner = 'STRMADMIN';
/*
RULE_OWNER,RULE_NAME,RULE_CONDITION,RULE_EVALUATION_CONTEXT_OWNER,RULE_EVALUATION_CONTEXT_NAME,RULE_ACTION_CONTEXT,RULE_COMMENT
STRMADMIN,STREAMS_TEST_UURAL1,(((:dml.get_object_owner() = 'D_DBA' and :dml.get_object_name() = 'STREAMS_TEST_UURAL')) and :dml.is_null_tag() = 'Y' and :dml.get_source_database_name() = 'DB1' ),SYS,STREAMS$_EVALUATION_CONTEXT,,
*/
select * from dba_rulesets where owner = 'STRMADMIN' order by ruleset_name;
/*
OWNER,RULESET_NAME,RULESET_STORAGE_TABLE,BASE_TABLE,RULESET_COMMENT
STRMADMIN,RULESET$_2,,SYS.STREAMS$_EVALUATION_CONTEXT,streams name is STRM04_CAPTURE
STRMADMIN,STREAMS_QUEUE_N,,STRMADMIN.AQ$_STREAMS_QUEUE_TABLE_V,
STRMADMIN,STREAMS_QUEUE_R,,STRMADMIN.AQ$_STREAMS_QUEUE_TABLE_V,
*/
select * from dba_rule_set_rules where rule_set_owner = 'STRMADMIN' order by rule_set_name;
/*
RULE_SET_OWNER,RULE_SET_NAME,RULE_OWNER,RULE_NAME,RULE_SET_RULE_ENABLED,RULE_SET_RULE_EVAL_CTX_OWNER,RULE_SET_RULE_EVAL_CTX_NAME,RULE_SET_RULE_COMMENT
STRMADMIN,RULESET$_2,STRMADMIN,STREAMS_TEST_UURAL1,ENABLED,,,"STRMADMIN"."RULESET$_2"
*/

You should set minimum supplemental log data in the source database (SUPPLEMENTAL_LOG_DATA_MIN), so that you can enable the table level supplemental logging in the source database.
--sys@db1
select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI from v$database;
/*
SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI
NO,NO,NO
*/
alter database add supplemental log data;
select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI from v$database;
/*
SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI
YES,NO,NO
*/

Now you should instantiate the source table
--strmadmin@db1
BEGIN
DBMS_CAPTURE_ADM.PREPARE_TABLE_INSTANTIATION(
table_name => 'd_dba.streams_test_uural',
supplemental_logging => 'keys');
END;
/
/*
DROP TABLE D_DBA.STREAMS_TEST_UURAL CASCADE CONSTRAINTS;
CREATE TABLE D_DBA.STREAMS_TEST_UURAL
(
COL1 NUMBER,
COL2 VARCHAR2(100 BYTE),
A_DATE DATE,
SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS,
SUPPLEMENTAL LOG DATA (UNIQUE) COLUMNS,
SUPPLEMENTAL LOG DATA (FOREIGN KEY) COLUMNS
)
TABLESPACE TS_DBA
LOGGING 
NOCOMPRESS 
NOCACHE
NOPARALLEL
MONITORING;
*/

By doing this you enabled the primary key, unique key and foreign key supplemental logging on the source table, so that if any of these key exists on this table the supplemental log data can be captured by using one one these key columns in the destination database.
If your table does not have any key columns like here then you should enable the "all" columns supplemental log data for this table to be place in to the newly generated archivelog files.

Check the current supplemental log data activated for this table and then add the all columns supplemental log data to this table.

Check activated supplemental logging
select * from dba_log_groups order by owner, table_name, log_group_type;
/*
OWNER,LOG_GROUP_NAME,TABLE_NAME,LOG_GROUP_TYPE,ALWAYS,GENERATED
D_DBA,SYS_C0043369,STREAMS_TEST_UURAL,FOREIGN KEY LOGGING,CONDITIONAL,GENERATED NAME
D_DBA,SYS_C0043367,STREAMS_TEST_UURAL,PRIMARY KEY LOGGING,ALWAYS,GENERATED NAME
D_DBA,SYS_C0043368,STREAMS_TEST_UURAL,UNIQUE KEY LOGGING,CONDITIONAL,GENERATED NAME
*/

alter table d_dba.streams_test_uural add supplemental log data (ALL) columns;

select * from dba_log_groups order by owner, table_name, log_group_type;
/*
OWNER,LOG_GROUP_NAME,TABLE_NAME,LOG_GROUP_TYPE,ALWAYS,GENERATED
D_DBA,SYS_C0043423,STREAMS_TEST_UURAL,ALL COLUMN LOGGING,ALWAYS,GENERATED NAME
D_DBA,SYS_C0043369,STREAMS_TEST_UURAL,FOREIGN KEY LOGGING,CONDITIONAL,GENERATED NAME
D_DBA,SYS_C0043367,STREAMS_TEST_UURAL,PRIMARY KEY LOGGING,ALWAYS,GENERATED NAME
D_DBA,SYS_C0043368,STREAMS_TEST_UURAL,UNIQUE KEY LOGGING,CONDITIONAL,GENERATED NAME
*/

Now we should create the replicated table in the target database as an empty table.
Instantiate the table in DB2
create table d_dba.streams_test_uural nologging
as select /*+ parallel(t1, 4) */ * from d_dba.streams_test_uural@db1 t1;

Create a database link in source database pointing to the target database
--strmadmin@db1 
CREATE DATABASE LINK db2
CONNECT TO strmadmin
IDENTIFIED BY "strmadminpwd"
USING 'DB2';
select sysdate from dual@db2;
--OK

Set the instantiation SCN for the "d_dba.streams_test_uural" table at db2 by running the following procedure at the source database db1:
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@db2(
source_object_name => 'd_dba.streams_test_uural',
source_database_name => 'db1',
instantiation_scn => iscn);
END;
/

After the instantiation SCN has been set, you can configure an apply process to apply LCRs for the "d_dba.streams_test_uural" table from the streams_queue queue. Setting the instantiation SCN for an object at a database is required only if an apply process applies LCRs for the object. When all of the necessary propagations and apply processes are configured, start the capture process using the START_CAPTURE procedure in DBMS_CAPTURE_ADM.

--strmadmin@db2
select * from dba_apply_instantiated_objects;
/*
SOURCE_DATABASE,SOURCE_OBJECT_OWNER,SOURCE_OBJECT_NAME,SOURCE_OBJECT_TYPE,INSTANTIATION_SCN,IGNORE_SCN,APPLY_DATABASE_LINK
DB1,D_DBA,STREAMS_TEST_UURAL,TABLE,74313116531,0,
*/

select * from dba_capture;
/*
CAPTURE_NAME,QUEUE_NAME,QUEUE_OWNER,RULE_SET_NAME,RULE_SET_OWNER,CAPTURE_USER,START_SCN,STATUS,CAPTURED_SCN,APPLIED_SCN,USE_DATABASE_LINK,FIRST_SCN,SOURCE_DATABASE,SOURCE_DBID,SOURCE_RESETLOGS_SCN,SOURCE_RESETLOGS_TIME,LOGMINER_ID,NEGATIVE_RULE_SET_NAME,NEGATIVE_RULE_SET_OWNER,MAX_CHECKPOINT_SCN,REQUIRED_CHECKPOINT_SCN,LOGFILE_ASSIGNMENT,STATUS_CHANGE_TIME,ERROR_NUMBER,ERROR_MESSAGE,VERSION,CAPTURE_TYPE,LAST_ENQUEUED_SCN,CHECKPOINT_RETENTION_TIME
STRM04_CAPTURE,STREAMS_QUEUE,STRMADMIN,RULESET$_2,STRMADMIN,STRMADMIN,74313112953,DISABLED,,,YES,74313112953,DB1,0,0,0,1,,,0,0,IMPLICIT,22/09/2011 17:52:51,,,,DOWNSTREAM,,60
*/
--STATUS=DISABLED

Creating an Apply Process for Captured Messages with DBMS_APPLY_ADM
BEGIN
DBMS_APPLY_ADM.CREATE_APPLY(
queue_name => 'strmadmin.streams_queue',
apply_name => 'strm04_apply',
rule_set_name => 'strmadmin.ruleset$_2',
message_handler => NULL,
ddl_handler => NULL,
apply_user => 'd_dba',
apply_database_link => NULL,
apply_tag => NULL,
apply_captured => true,
precommit_handler => NULL,
negative_rule_set_name => NULL,
source_database => 'db1');
END;
/
select * from dba_apply_error;
--Nrr
select * from dba_apply_progress;
/*
APPLY_NAME,SOURCE_DATABASE,APPLIED_MESSAGE_NUMBER,OLDEST_MESSAGE_NUMBER,APPLY_TIME,APPLIED_MESSAGE_CREATE_TIME,OLDEST_TRANSACTION_ID
STRM04_APPLY,DB1,0,0,01/01/1988,01/01/1988,
*/
select * from sys.streams$_apply_progress; --holds historical data of the apply process

Starting an Apply Process
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'strm04_apply');
END;
/

Starting a Capture Process
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'strm04_capture');
END;
/

Check the views
select * from dba_capture;
/*
CAPTURE_NAME,QUEUE_NAME,QUEUE_OWNER,RULE_SET_NAME,RULE_SET_OWNER,CAPTURE_USER,START_SCN,STATUS,CAPTURED_SCN,APPLIED_SCN,USE_DATABASE_LINK,FIRST_SCN,SOURCE_DATABASE,SOURCE_DBID,SOURCE_RESETLOGS_SCN,SOURCE_RESETLOGS_TIME,LOGMINER_ID,NEGATIVE_RULE_SET_NAME,NEGATIVE_RULE_SET_OWNER,MAX_CHECKPOINT_SCN,REQUIRED_CHECKPOINT_SCN,LOGFILE_ASSIGNMENT,STATUS_CHANGE_TIME,ERROR_NUMBER,ERROR_MESSAGE,VERSION,CAPTURE_TYPE,LAST_ENQUEUED_SCN,CHECKPOINT_RETENTION_TIME
STRM04_CAPTURE,STREAMS_QUEUE,STRMADMIN,RULESET$_2,STRMADMIN,STRMADMIN,74313112953,ENABLED,74313112953,74313112953,YES,74313112953,DB1,2977989118,538113,699577217,1,,,74318832255,0,IMPLICIT,23/09/2011 11:23:51,,,10.2.0.4.0,DOWNSTREAM,0,60
*/

--STATUS=ENABLED

select * from dba_apply;
/*
APPLY_NAME,QUEUE_NAME,QUEUE_OWNER,APPLY_CAPTURED,RULE_SET_NAME,RULE_SET_OWNER,APPLY_USER,APPLY_DATABASE_LINK,APPLY_TAG,DDL_HANDLER,PRECOMMIT_HANDLER,MESSAGE_HANDLER,STATUS,MAX_APPLIED_MESSAGE_NUMBER,NEGATIVE_RULE_SET_NAME,NEGATIVE_RULE_SET_OWNER,STATUS_CHANGE_TIME,ERROR_NUMBER,ERROR_MESSAGE
STRM04_APPLY,STREAMS_QUEUE,STRMADMIN,YES,RULESET$_2,STRMADMIN,D_DBA,,,,,,ENABLED,,,,23/09/2011 11:22:19,,
*/
select * from d_dba.streams_test_uural;
/*
COL1,COL2,A_DATE
1,1,22/09/2011 17:49:55
*/

Now try to add more rows to the source table in the source database and check whether they will be replicated to the target database or not.

--sys@db1
insert into d_dba.streams_test_uural(col1, col2, a_date) values (2,'2',sysdate);
commit;
alter system switch logfile;
select DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() from dual;
--74318845118
select max(sequence#) from v$archived_log where next_change# <= 74318845118; 
--200551

--strmadmin@db2
select * from v$streams_capture;
/*
SID,SERIAL#,CAPTURE#,CAPTURE_NAME,LOGMINER_ID,STARTUP_TIME,STATE,TOTAL_PREFILTER_DISCARDED,TOTAL_PREFILTER_KEPT,TOTAL_PREFILTER_EVALUATIONS,TOTAL_MESSAGES_CAPTURED,CAPTURE_TIME,CAPTURE_MESSAGE_NUMBER,CAPTURE_MESSAGE_CREATE_TIME,TOTAL_MESSAGES_CREATED,TOTAL_FULL_EVALUATIONS,TOTAL_MESSAGES_ENQUEUED,ENQUEUE_TIME,ENQUEUE_MESSAGE_NUMBER,ENQUEUE_MESSAGE_CREATE_TIME,AVAILABLE_MESSAGE_NUMBER,AVAILABLE_MESSAGE_CREATE_TIME,ELAPSED_CAPTURE_TIME,ELAPSED_RULE_TIME,ELAPSED_ENQUEUE_TIME,ELAPSED_LCR_TIME,ELAPSED_REDO_WAIT_TIME,ELAPSED_PAUSE_TIME,STATE_CHANGED_TIME
506,230,1,STRM04_CAPTURE,1,23/09/2011 11:23:51,CAPTURING CHANGES,6360288,0,6360289,157710,23/09/2011 11:25:49,74314650119,23/09/2011 06:55:23,157797,43,44,23/09/2011 11:24:35,74313260014,23/09/2011 00:24:42,74318845076,23/09/2011 10:46:45,11105,0,22,95,0,0,23/09/2011 11:25:49
*/
select * from d_dba.streams_test_uural;
/*
COL1,COL2,A_DATE
1,1,22/09/2011 17:49:55
2,2,23/09/2011 10:46:36
*/
As a result we see that newly inserted row in source table is replicated to the target database. We can check the apply errors, if there are any, by using the below sql in the target database.
select * from dba_apply_error;
--No rows returned
Above example only replicates the data changes in the source table to the destination table, we can also capture and apply the DDL changes happening on the source table to the destination table. You can use the following commands to achive this. Add DDL capture rule to existing table ruleset :
--strmadmin@db2
select * from dba_rules where rule_owner = 'STRMADMIN';
/*
RULE_OWNER,RULE_NAME,RULE_CONDITION,RULE_EVALUATION_CONTEXT_OWNER,RULE_EVALUATION_CONTEXT_NAME,RULE_ACTION_CONTEXT,RULE_COMMENT
STRMADMIN,STREAMS_TEST_UURAL1,(((:dml.get_object_owner() = 'D_DBA' and :dml.get_object_name() = 'STREAMS_TEST_UURAL')) and :dml.is_null_tag() = 'Y' and :dml.get_source_database_name() = 'DB1' ),SYS,STREAMS$_EVALUATION_CONTEXT,,
*/
select * from dba_rulesets where owner = 'STRMADMIN' order by ruleset_name;
/*
OWNER,RULESET_NAME,RULESET_STORAGE_TABLE,BASE_TABLE,RULESET_COMMENT
STRMADMIN,RULESET$_2,,SYS.STREAMS$_EVALUATION_CONTEXT,streams name is STRM04_CAPTURE
STRMADMIN,STREAMS_QUEUE_N,,STRMADMIN.AQ$_STREAMS_QUEUE_TABLE_V,
STRMADMIN,STREAMS_QUEUE_R,,STRMADMIN.AQ$_STREAMS_QUEUE_TABLE_V,
*/
select * from dba_rule_set_rules where rule_set_owner = 'STRMADMIN' order by rule_set_name;
/*
RULE_SET_OWNER,RULE_SET_NAME,RULE_OWNER,RULE_NAME,RULE_SET_RULE_ENABLED,RULE_SET_RULE_EVAL_CTX_OWNER,RULE_SET_RULE_EVAL_CTX_NAME,RULE_SET_RULE_COMMENT
STRMADMIN,RULESET$_2,STRMADMIN,STREAMS_TEST_UURAL1,ENABLED,,,"STRMADMIN"."RULESET$_2"
STRMADMIN,STREAMS_QUEUE_R,STRMADMIN,STREAMS_TEST_UURAL1,ENABLED,SYS,STREAMS$_EVALUATION_CONTEXT,adding to queue default ruleset
*/
Now we add a new DDL rule to the existing table rule by using the parameter "include_ddl => true"
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'd_dba.streams_test_uural',
streams_type => 'capture',
streams_name => 'strm04_capture',
queue_name => 'strmadmin.streams_queue',
include_dml => false,
include_ddl => true,
source_database => 'db1',
inclusion_rule => true);
END;
/
Running this procedure performs the following actions: ¦ Creates two rules. One rule evaluates to TRUE for DML changes to the d_dba.streams_test_uural table, and the other rule evaluates to TRUE for DDL changes to the d_dba.streams_test_uural table. The rule names are system generated. ¦ Adds the two rules to the positive rule set associated with the capture process because the inclusion_rule parameter is set to true. ¦ Prepares the d_dba.streams_test_uural table for instantiation by running the PREPARE_TABLE_INSTANTIATION procedure in the DBMS_CAPTURE_ADM package. ¦ Enables supplemental logging for any primary key, unique key, bitmap index, and foreign key columns in the d_dba.streams_test_uural table. When the PREPARE_TABLE_INSTANTIATION procedure is run, the default value (keys) is specified for the supplemental_logging parameter. If the capture process is performing downstream capture, then the table is prepared for instantiation and supplemental logging is enabled for key columns only if the downstream capture process uses a database link to the source database. If a downstream capture process does not use a database link to the source database, then the table must be prepared for instantiation manually and supplemental logging must be enabled manually.
select * from dba_rules where rule_owner = 'STRMADMIN';
/*
RULE_OWNER,RULE_NAME,RULE_CONDITION,RULE_EVALUATION_CONTEXT_OWNER,RULE_EVALUATION_CONTEXT_NAME,RULE_ACTION_CONTEXT,RULE_COMMENT
STRMADMIN,STREAMS_TEST_UURAL1,(((:dml.get_object_owner() = 'D_DBA' and :dml.get_object_name() = 'STREAMS_TEST_UURAL')) and :dml.is_null_tag() = 'Y' and :dml.get_source_database_name() = 'DB1' ),SYS,STREAMS$_EVALUATION_CONTEXT,,
STRMADMIN,STREAMS_TEST_UURAL3,(((:ddl.get_object_owner() = 'D_DBA' and :ddl.get_object_name() = 'STREAMS_TEST_UURAL')or (:ddl.get_base_table_owner() = 'D_DBA' and :ddl.get_base_table_name() = 'STREAMS_TEST_UURAL')) and :ddl.is_null_tag() = 'Y' and :ddl.get_source_database_name() = 'DB1' ),SYS,STREAMS$_EVALUATION_CONTEXT,,
*/
select * from dba_rulesets where owner = 'STRMADMIN' order by ruleset_name;
/*
OWNER,RULESET_NAME,RULESET_STORAGE_TABLE,BASE_TABLE,RULESET_COMMENT
STRMADMIN,RULESET$_2,,SYS.STREAMS$_EVALUATION_CONTEXT,streams name is STRM04_CAPTURE
STRMADMIN,STREAMS_QUEUE_N,,STRMADMIN.AQ$_STREAMS_QUEUE_TABLE_V,
STRMADMIN,STREAMS_QUEUE_R,,STRMADMIN.AQ$_STREAMS_QUEUE_TABLE_V,
*/
select * from dba_rule_set_rules where rule_set_owner = 'STRMADMIN' order by rule_set_name;
/*
RULE_SET_OWNER,RULE_SET_NAME,RULE_OWNER,RULE_NAME,RULE_SET_RULE_ENABLED,RULE_SET_RULE_EVAL_CTX_OWNER,RULE_SET_RULE_EVAL_CTX_NAME,RULE_SET_RULE_COMMENT
STRMADMIN,RULESET$_2,STRMADMIN,STREAMS_TEST_UURAL3,ENABLED,,,
STRMADMIN,RULESET$_2,STRMADMIN,STREAMS_TEST_UURAL1,ENABLED,,,"STRMADMIN"."RULESET$_2"
STRMADMIN,STREAMS_QUEUE_R,STRMADMIN,STREAMS_TEST_UURAL3,ENABLED,SYS,STREAMS$_EVALUATION_CONTEXT,adding to queue default ruleset
STRMADMIN,STREAMS_QUEUE_R,STRMADMIN,STREAMS_TEST_UURAL1,ENABLED,SYS,STREAMS$_EVALUATION_CONTEXT,adding to queue default ruleset
*/
Lets test the DDL capture :
--sys@db1
alter table d_dba.streams_test_uural drop column col2;
select DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() from dual;
--74318863000
alter system switch logfile;
select max(sequence#) from v$archived_log where next_change# <= 74318863000; 
--200556

--strmadmin@db2
select * from v$streams_capture;
/*
SID,SERIAL#,CAPTURE#,CAPTURE_NAME,LOGMINER_ID,STARTUP_TIME,STATE,TOTAL_PREFILTER_DISCARDED,TOTAL_PREFILTER_KEPT,TOTAL_PREFILTER_EVALUATIONS,TOTAL_MESSAGES_CAPTURED,CAPTURE_TIME,CAPTURE_MESSAGE_NUMBER,CAPTURE_MESSAGE_CREATE_TIME,TOTAL_MESSAGES_CREATED,TOTAL_FULL_EVALUATIONS,TOTAL_MESSAGES_ENQUEUED,ENQUEUE_TIME,ENQUEUE_MESSAGE_NUMBER,ENQUEUE_MESSAGE_CREATE_TIME,AVAILABLE_MESSAGE_NUMBER,AVAILABLE_MESSAGE_CREATE_TIME,ELAPSED_CAPTURE_TIME,ELAPSED_RULE_TIME,ELAPSED_ENQUEUE_TIME,ELAPSED_LCR_TIME,ELAPSED_REDO_WAIT_TIME,ELAPSED_PAUSE_TIME,STATE_CHANGED_TIME
506,230,1,STRM04_CAPTURE,1,23/09/2011 11:23:51,WAITING FOR REDO: LAST SCN MINED 74318863003,18339223,0,18339230,224424,23/09/2011 17:15:01,74318863003,23/09/2011 17:14:50,224545,45,82,23/09/2011 17:15:33,74318863003,,74318863003,23/09/2011 17:14:58,25043,0,23,107,2087402,0,23/09/2011 17:16:03
*/
select * from d_dba.streams_test_uural;
/*
COL1,A_DATE
1,22/09/2011 17:49:55
3,23/09/2011 11:30:20
*/
OK , col2 column is dropped in destination db2 database also Other streams commands that you might need :
--Removing a Rule from a Rule Set for a Capture Process
BEGIN
DBMS_STREAMS_ADM.REMOVE_RULE(
rule_name => 'departments3',
streams_type => 'capture',
streams_name => 'strm01_capture',
drop_unused_rule => true,
inclusion_rule => true);
END;
/

--Removing a Rule Set for a Capture Process
BEGIN
DBMS_CAPTURE_ADM.ALTER_CAPTURE(
capture_name => 'strm01_capture',
remove_rule_set => true,
remove_negative_rule_set => true);
END;
/

--Setting a Capture Process Parameter
BEGIN
DBMS_CAPTURE_ADM.SET_PARAMETER(
capture_name => 'strm01_capture',
parameter => 'parallelism',
value => '3');
END;
/

--Removing a Streams Configuration
You run the REMOVE_STREAMS_CONFIGURATION procedure in the DBMS_STREAMS_ADM package to remove a Streams configuration at the local database.
To remove the Streams configuration at the local database, run the following
procedure:
EXEC DBMS_STREAMS_ADM.REMOVE_STREAMS_CONFIGURATION();
After running this procedure, drop the Streams administrator at the database, if
possible.
Attention: Running this procedure is dangerous. You should run
this procedure only if you are sure you want to remove the entire
Streams configuration at a database.