Make History with Streams

Share Button

Oracle Streams is an outstanding way to get data from one server to another. It is NOTHING like the advanced replication of old, which is why I call it “outstanding” and not “horribly inefficient and error-prone, not to mention annoying.”

Recently I was asked how to implement a custom INSERT handler for a streams setup. The client had the following situation:

  • A production table that housed important data
  • A history table that was inserted into every time a record on the production table changed
  • A remote database meant to hold history

The client wished to set up Streams as follows:

  • On insert to the history table, attempt to stream the row across to the target DB
  • On successful apply to the target DB, remove the row from the source DB history table
  • Do not propagate deletes from the source to the target

Thankfully, it was pretty easy. We used a combination of a negative propagation rule for deletes and an apply-side DML handler for inserts. Let’s take a look.

First Step: Halt Propagation of Deletes

This was accomplished using the ADD_TABLE_PROPAGATION_RULES procedure of the DBMS_STREAMS_ADM package with a negative rule on DELETE. A negative rule simply means that we want to set inclusion to FALSE if the condition is TRUE.

  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name              => 'streams_test.test_hist',
    streams_name            => 'prop_test',
    source_queue_name       => 'streamsadm.rep_capture_queue',
    destination_queue_name  => 'streamsadm.rep_dest_queue@targetdb',
    include_dml             => true,
    include_ddl             => true,
    source_database         => 'primarydb',
    inclusion_rule          => false,
    and_condition           => ':lcr.GET_COMMAND_TYPE() = ''DELETE''');
END;
/

As you can see, we have set up a rule for stream prop_test, on table streams_test.test_hist. This rule applies to rows coming from the rep_capture_queue locally and going to the rep_dest_queue on the target database. We say to include DML, and include DDL; however, we also say in the and_condition that if the LCR (Logical Change Request) type is DELETE, to NOT include it (inclusion_rule => false).

That takes care of deletes. No deletes on that table will propagate, though anything else will.

Create a Handler on the Target

The next step was to create a procedure on the target database that would be used as an Apply handler. This procedure is owned by streams_test, the actual user being streamed.

create or replace procedure streams_test.del_orig_lcr (in_any in sys.anydata) is
  lcr sys.lcr$_row_record;
  lcrrow sys.lcr$_row_list;
  rc pls_integer;
  pkdata sys.anydata;
  pkval number;
begin
  rc := in_any.getobject(lcr); 
  lcrrow := lcr.GET_VALUES('NEW');
  lcr.execute(true);
  for i in 1..lcrrow.count
  loop
    if lcrrow(i).column_name = 'ID' THEN
      pkdata := lcrrow(i).data;
      rc := pkdata.getNumber(pkval); 
      delete from streams_test.test_hist@primarydb where id = pkval;
      commit;
    end if;
  end loop;
end;
/

This little piece of code will accept an LCR (Logical Change Request), and extract the values. First, it executes the LCR on the apply side. If that is successful, it finds the primary key column (in this case ID), and deletes it from the remote database. Note that in order to do the delete, you will need a database link pointing back to the source DB. When the delete is processed on the source database, it wont propagate over to the target because of the rule we made above. If the insert to the target does not work, it will not delete from the source. We have covered all our bases to have a process that will ONLY delete the original if the apply works properly.

Last Step: The DML Handler

The last step is to tell Oracle that it should do something special when the apply process runs. To accomplish this, we will set up an apply handler on the target database (not the source):

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
     object_name		=>	'streams_test.test_hist',
     object_type		=>	'TABLE',
     operation_name		=>	'INSERT',
     error_handler		=>	false,
     user_procedure		=>	'streams_test.del_orig_lcr',
     apply_database_link	=>	null,
     apply_name			=>	null);
END;
/

As you can see, we tell the apply that if DML comes in of type INSERT, to run the streams_test.del_orig_lcr procedure. We’re rocking and rolling now!

The Test – Finale

The following test was run on the primary database:

SQL> select * from streams_test.test_hist;

no rows selected

SQL> insert into streams_test.test_hist values (2, 'Jeff', sysdate);

1 row created.

SQL> commit;

Commit complete.

SQL> select * from streams_test.test_hist;

no rows selected

SQL> insert into streams_test.test_hist values (2, 'Kristina', sysdate);

1 row created.

SQL> commit;

Commit complete.

SQL> select * from streams_test.test_hist;

no rows selected

SQL> select * from streams_test.test_hist@targetdb;

        ID CHARFIELD                                          THEDATE
---------- -------------------------------------------------- ---------
         1 Steve                                              10-NOV-06
         2 Kristina                                           10-NOV-06

SQL> insert into streams_test.test_hist values (2, 'Duplicate', sysdate);

1 row created.

SQL> commit;

Commit complete.

SQL> select * from streams_test.test_hist;

        ID CHARFIELD                                          THEDATE
---------- -------------------------------------------------- ---------
         2 Duplicate                                          10-NOV-06

SQL> select * from streams_test.test_hist@targetdb;

        ID CHARFIELD                                          THEDATE
---------- -------------------------------------------------- ---------
         1 Steve                                              10-NOV-06
         2 Kristina                                           10-NOV-06

Conclusion

We inserted two records on the primary (source), and both of them propagated across to the target and deleted on the source side. However, when we tried to force an error with a duplicate entry for the PK on the ID column, notice that the row stayed in the table on the primary. This way, we are always guaranteed to keep our data, even if there is a streams error or some other kind of issue.

Share Button

8 comments

  1. Hello there,

    Can you please send me steps required to replicate same setup which you have done?

    We need something similar to this.

    Regards,
    Sujit

  2. Hi,
    I would like to ask you about the possibility apply the same handler for several tables (whole schema ) without specifying the DBMS_APPLY_ADM.SET_DML_HANDLER for each of the table.
    My situation is: I would like to replicate all tables with additional 3 columns (timestamp, user, dml operation). This transformation is done in one plsql proc – the handler. However in the apply phase I have to define for each db object (tables) three set_dml_handler (update, insert, delete) statment point to the handler. And this is exactly I would like avoid, the workload to define 3 set_dml_handlers per tables (in my shcema is around 1000 tables). Can you advice how to manage it in easier way ?
    We have 10g DB.
    Many thanks,
    GG

  3. Hi Steve,
    I have setup a streams environment similar to above. My source and destination schemas are on the same database hence i do not have a propagate rules. Both capture and apply rules are using the same queue. My problem is, I’m not able to replicate DDL. DML is fine. Do you have any idea?

    Thanks and regards,
    Eric C

  4. I set a similar setup up for a BI application collecting data from nationwide systems last year.

    The volumes were high though.

    Although the DML handler was effficient in itself, it added an overhead of 100ms to each row. This was too much for our data volumes.

    Eventually dropped the DML handler and scheduled a delete on the source tables to delete any data more than a specified number of days old. In fact, as Streams picks up data from redo logs, could simply delete data as soon as it is inserted, as long as deletes are not propagated. We retained data for three days for verification purposes only.

  5. The script works for tables which have single column primary keys.
    If there are more then one column in primary key how can we handle?
    and if primary key is of char type?

    Thanks

  6. Hi!
    Good post! Now, what if I only want to replicate records where JOB = SALESMAN?
    I have been searching the web, reading the docs and done some tests but I just can’t get it to work…
    I tried with a subset rule like this:
    BEGIN
    DBMS_STREAMS_ADM.ADD_SUBSET_RULES(
    table_name => ‘SCOTT.EMP2’,
    dml_condition => ‘ JOB IN (”SALESMAN”) ‘,
    streams_type => ‘CAPTURE’,
    streams_name => ‘EMP2_CAPTURE’,
    queue_name => ‘EMP2_CPT_Q’,
    source_database => ‘DB10G.NCBS.COM’)
    ;
    END;
    /

    Still, every record is being replicated… Do you have any ideas what else to try?

    Thanks & cheers!
    /Måns

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.