Replication using LogMiner

This tip describes how to leverage Oracle LogMiner as a data replication utility.

This script was born out of a desire to replicate data from production while minimizing the effect on production.

The best way to do this was to leverage APIs that Oracle made available in 8i. Some or all of this logic provided becomes unnecessary in 9i with the advent of Oracle Streams, although I would contend that this provides a better level of control. The script is based on a article from George Jucan at www.opendatasys.com called "Using Oracle LogMiner as a data replication utility," which explains the way a program or set of programs could make use of this API. Our business had already leveraged information available in the LogMiner to track activity in an app.

There are some C applications involved in this as well but I am only supplying the PL/SQL and C Oracle external functions as I feel this is enough to get started. Some of these file and directory procs were created to reproduce functions that are now available in 9i:

  • Dir_proc - C Oracle external appl to get a list of all files in a Unix directory
  • File_proc - C Oracle external appl to determine the existence of a Unix file
  • File_del_proc - C Oracle external appl to delete a file in /tmp
  • Get_ora_tab - Function that drives the diy$oratab view
  • Logminer_stats - Procedure that returns counts of local logmnr_contents store
  • Load_logminer - workhorse of the app. Identify, Extract and delete archive log into local store

So how does it all fit together? I have another program that creates a known state. A known state is required based on a deficiency in 8i LogMiner. 8i LogMiner does not capture key information for deletes and updates. So what does it capture? Oracle8i LogMiner captures rowid for update and deletes. This is addressed in 9i where LogMiner captures key information on delete and update if the table being captured has a primary/unique key. I did not have this luxury in 8i. So the initial table state is captured from the source instance including the rowid of each row from the source table I capture this in a column called drowid. The drowid is used to apply the update or delete to the destination table data. This was a hack, but a necessary hack based on what was available. Once a state is captured, we can then move on to the logic supplied here and extract changes from a set of archive logs. So the process is...

  1. Enable archive logging in source instance.
  2. Create a known state -- this is only necessary in 8i and then only if you need to worry about update/deletes.
  3. Copy all archive logs from source to the machine where the destination instance lives. This is ongoing afterward.
  4. Change the init.ora of the destination instance to have log_archive_dest point to the archive logs that have been captured from the source instance.
  5. At regular intervals:
    a. Run the load_logminer proc provided.
    b. Apply the dml in logmnr_contents to the destination instance.
    c. Update checkpoint_change# in lmstate_checkpoint as each dml is applied.
    d. Commit after the scn changes in logmnr_contents table

Here is the PL/SQL source. First we create a schema with DBA rights (lmdba)...

create user lmdba identified by password default tablespace lmdata
temporary tablespace temp;
grant dba,connect to lmdba;
grant execute on dbms_logmnr to lmdba;
grant execute on dbms_logmnr_d to lmdba;
grant select on sys.v_$database to lmdba;
grant select on sys.v_$parameter to lmdba;
grant select on sys.GV_$LOGMNR_logs to lmdba;
grant select on sys.GV_$LOGMNR_contents to lmdba;
grant select on sys.V_$LOGMNR_contents to lmdba;
grant execute on dbms_backup_restore to lmdba;
grant analyze any,select any table,delete any table,insert any table, update
any table to lmdba;
grant alter any table to lmdba;
grant drop any table to lmdba;

Then as the lmdba user...

drop view diy$oratab;
drop function get_oratab;
drop type oratab_type;
drop type oratab_row_type;
drop LIBRARY os_lib ;
CREATE LIBRARY os_lib is '/u01/apps/oracle/product/8.1.7/lib/osfunc.so';
/
CREATE OR REPLACE procedure dir_proc (FileName IN char,Path in char)
    as external
    library os_lib
    name "dir_func"
    language C
    parameters
   (FileName string,Path string);
/
--
CREATE OR REPLACE procedure file_proc (FileName IN char, bexists out
binary_integer,file_size out binary_integer, block_count out binary_integer)
    as external
    library os_lib
    name "file_func"
    language C
    with context
    parameters
   (context,
    FileName string,
    bexists int,
    file_size int,
    block_count int
);
/
CREATE OR REPLACE procedure file_del_proc (FileName IN char)
    as external
    library os_lib
    name "file_del_func"
    language C
    parameters
   (FileName string);
/
drop TABLE LMSTATE_CHECKPOINT;
CREATE TABLE LMSTATE_CHECKPOINT (CHECKPOINT_CHANGE#  NUMBER);
--
drop TABLE LMSUBSCRIBE;
CREATE TABLE LMSUBSCRIBE ( 
  OWNER       VARCHAR2 (30), 
  TABLE_NAME  VARCHAR2 (30) ) ; 
--
drop TABLE LMTABLES;
CREATE TABLE LMTABLES ( 
  OWNER       VARCHAR2 (30), 
  TABLE_NAME  VARCHAR2 (30),
  LOAD_ORDER  numeric default 10 not null) ; 
--
drop TABLE LM_LOG;
CREATE TABLE LM_LOG (LM_STATE VARCHAR2(2000));
--
drop table LOGMNR_CONTENTS;
create table LOGMNR_CONTENTS (
  SCN           NUMBER,
  TIMESTAMP     DATE,
  THREAD#       NUMBER,
  LOG_ID        NUMBER,
  XIDUSN        NUMBER,
  XIDSLT        NUMBER,
  XIDSQN        NUMBER,
  RBASQN        NUMBER,
  RBABLK        NUMBER,
  RBABYTE       NUMBER,
  UBAFIL        NUMBER,
  UBABLK        NUMBER,
  UBAREC        NUMBER,
  UBASQN        NUMBER,
  ABS_FILE#     NUMBER,
  REL_FILE#     NUMBER,
  DATA_BLK#     NUMBER,
  DATA_OBJ#     NUMBER,
  DATA_OBJD#    NUMBER,
  SEG_OWNER     VARCHAR2(32),
  SEG_NAME      VARCHAR2(32),
  SEG_TYPE      NUMBER,
  SEG_TYPE_NAME VARCHAR2(32),
  TABLE_SPACE   VARCHAR2(32),
  ROW_ID        VARCHAR2(19),
  SESSION#      NUMBER,
  SERIAL#       NUMBER,
  USERNAME      VARCHAR2(32),
  SESSION_INFO  VARCHAR2(4000),
  ROLLBACK      NUMBER,
  OPERATION     VARCHAR2(32),
  SQL_REDO      VARCHAR2(4000),
  SQL_UNDO      VARCHAR2(4000),
  RS_ID         VARCHAR2(32),
  SSN           NUMBER,
  CSF           NUMBER,
  INFO          VARCHAR2(32),
  STATUS        NUMBER,
  PH1_NAME      VARCHAR2(32),
  PH1_REDO      VARCHAR2(2000),
  PH1_UNDO      VARCHAR2(2000),
  PH2_NAME      VARCHAR2(32),
  PH2_REDO      VARCHAR2(2000),
  PH2_UNDO      VARCHAR2(2000),
  PH3_NAME      VARCHAR2(32),
  PH3_REDO      VARCHAR2(2000),
  PH3_UNDO      VARCHAR2(2000),
  PH4_NAME      VARCHAR2(32),
  PH4_REDO      VARCHAR2(2000),
  PH4_UNDO      VARCHAR2(2000),
  PH5_NAME      VARCHAR2(32),
  PH5_REDO      VARCHAR2(2000),
  PH5_UNDO      VARCHAR2(2000)
);
--
drop TABLE get_oratab_setting;
CREATE GLOBAL TEMPORARY TABLE get_oratab_setting
(thedir varchar2(200))
ON COMMIT PRESERVE ROWS;
--
Create Type Oratab_Row_Type As Object ( FILE_NAME  Varchar2(100)); 
/
Create Type Oratab_Type Is Table Of Oratab_Row_Type;
/ 
--
Create Or Replace Function GET_ORATAB Return Oratab_Type Is
   Ora_Tab         Oratab_Type := Oratab_Type( Oratab_Row_Type( Null ) );
   f_Handle        Utl_File.File_Type;
   i_Pos           Integer;
   v_file_name     Varchar2(100);
   b_Read          Boolean := TRUE;
   b_First         Boolean := TRUE;
   tmp_file        varchar2(50);
   mydir    varchar2(200);
Begin
   --
   -- Note that in order to make the code shorter all the
   -- utl_file defined exceptions are left unhandled.
   -- 
   tmp_file :='oracle_' || to_char(sysdate,'yyyymmddhhss');
   begin
      select thedir into mydir from get_oratab_setting;
      Exception
         When NO_DATA_FOUND Then mydir := null;
   end;
   if mydir is not null then
    dir_proc('/tmp/' || tmp_file,mydir);
    f_Handle := Utl_File.FOpen( '/tmp' , tmp_file, 'r' );
    While b_Read Loop
       Begin
  Utl_File.Get_Line( f_Handle, v_file_name);
  if b_first then
   b_First := FALSE;
  Else
   Ora_Tab.Extend;
  End If; 

  Ora_Tab( Ora_Tab.Last ) := Oratab_Row_Type(
rtrim(v_file_name));
       Exception
   When NO_DATA_FOUND Then
        b_Read := FALSE;
       End;
    End Loop; 
    Utl_File.FClose( f_Handle );
   end if;
   file_del_proc(tmp_file);
   Return Ora_Tab;
End;
/
Create Or Replace View DIY$ORATAB
As Select * From Table( Cast( GET_ORATAB() As Oratab_Type ) ); 
/ 
CREATE OR REPLACE  PROCEDURE "LOGMINER_STATS"  (insert_count out
integer,delete_count out integer, update_count out integer,total_count out
integer) is 
       EMPTY_LOGMNR_CONTENTS exception; 
       PRAGMA EXCEPTION_INIT(EMPTY_LOGMNR_CONTENTS, -1306); 
       cursor the_csr is Select count(*) the_count,operation
       from LOGMNR_contents,lmsubscribe b
       where seg_owner = b.owner
       and seg_name = b.table_name
       and operation in ('INSERT','DELETE','UPDATE')
       group by operation;

begin
     insert_count :=0;
     update_count :=0;
     delete_count :=0;
     total_count :=0;
     for the_rec in the_csr loop
         if the_rec.operation = 'INSERT' then
            insert_count := the_rec.the_count;
         elsif the_rec.operation = 'DELETE' then
            delete_count := the_rec.the_count;
         elsif the_rec.operation = 'UPDATE' then
            update_count := the_rec.the_count;
         else
             null;
         end if;
     end loop;
     total_count := insert_count + delete_count + update_count;
     exception
      when no_data_found then null;
      when empty_logmnr_contents then null;
      when others then
           Raise_application_error(-20000, 'Error in LOGMNR_CONTENTS View');

end logminer_stats;
/
PROCEDURE "LOAD_LOGMINER" as
EMPTY_LOGMNR_CONTENTS exception;
SID_FILE varchar2(30);
ARCH_DIR varchar2(100);
WORK_FILE varchar2(200);
ARCH_COUNT number :=0;
cursor the_csr is select file_name from diy$oratab;
first_time boolean := true;
start_scn_local number;
PRAGMA EXCEPTION_INIT(EMPTY_LOGMNR_CONTENTS, -1306);
Begin
/* clean the slate of all prior activity */
  delete from LOGMNR_CONTENTS;
  delete from lm_log;
  delete from get_oratab_setting;
  commit;
/* determine location of archive logs so we can get a directory of log_archive_dest */
  select value into arch_dir from sys.v_$parameter where name =
'log_archive_dest';
/* prime get_oratab_setting with directory name of archive logs */
  insert into get_oratab_setting (thedir) values (arch_dir);
  commit;
/* determine checkpoint of what has been applied to date */
  select CHECKPOINT_CHANGE# into start_scn_local from lmstate_checkpoint;  
/* go through each archive log and add to the local logmnr_contents table where applicable */
  begin
      for the_rec in the_csr loop
          work_file := the_rec.file_name;
          if work_file is not null then
               insert into lm_log (lm_state) values ('start loading archive
log ' || work_file);
               commit;
               SYS.DBMS_LOGMNR.ADD_LOGFILE( LogFileName=>work_file,
Options=>SYS.DBMS_LOGMNR.NEW);
               first_time := false;
               begin
/* use logfile generated from local or foreign database */
                    SYS.DBMS_LOGMNR.START_LOGMNR(
 
DictFileName=>'/u01/apps/oracle/product/8.1.7/dbs/SEED_dict.ora');
                    commit;
                    exception when others then
                       Raise_application_error(-20000, 'Error in
LOAD_LOGMINER start_logmnr ' || 
                       sqlerrm || sqlcode);
               end;
               begin
  /* strip what we want out of v_$logmnr_contents into our local copy */
                    insert /*+ APPEND */ into LOGMNR_CONTENTS
                          select a.* from sys.V_$LOGMNR_CONTENTS a;
      /*Add any conditional logic here e.g. ...*/
                          /*where seg_owner = 'SOME_OWNER' and seg_name = 'SOME_TABLE' */
                    commit;
                    exception 
                    when no_data_found then null;
                    when EMPTY_LOGMNR_CONTENTS then null;
                    when others then
                    Raise_application_error(-20002, 
                    'Error in LOGMNR_CONTENTS View' || sqlerrm || sqlcode);
              end;               
              begin
  /* end for this log and delete it */
                   SYS.dbms_logmnr.end_logmnr;
                   insert into lm_log (lm_state) values ('end loading
archive log ' || work_file);
                   commit;
                   sys.dbms_backup_restore.deletefile(work_file);
                   exception when others then null;
              end;
          end if;
      end loop; 
      exception when others then
          Raise_application_error(-20010, 'Error in LOAD_LOGMINER add ' || 
                                  work_file || sqlerrm || sqlcode);
  end;
/* ok now we have our local store of activity clean it up and prep it to be
used in the apply program. Put the prepped sql in ph1_redo column which is
unused */
 begin
  update  LOGMNR_CONTENTS
  set ph1_redo = 
 
replace(replace(replace(sql_redo,'ROWID','DROWID'),'"',''),';','')
  where ph1_redo is null
  and operation in ('UPDATE','DELETE');
--
  update  LOGMNR_CONTENTS
  set ph1_redo = replace(replace(replace(replace(sql_redo,') 
  values',',drowid) values'),');', ',' ||
  chr(39)  || rowidtochar(ROW_ID) ||  chr(39)  ||
');'),'"',''),';','')
  where ph1_redo is null
and seg_owner = 'SOME_SCHEMA'
  and operation = 'INSERT';
--
update  LOGMNR_CONTENTS
  set ph1_redo = replace(replace(sql_redo,'"',''),';','')
  where ph1_redo is null
  and seg_owner = 'SOME_OTHER_OWNER'
  and operation = 'INSERT';
--
delete from logmnr_contents where ph1_redo is null;
--
  commit;
 end;
end;

Here is the c source code for Oracle external functions:

#include 
 
   
#include 
  
    
#include 
   
     
#include 
    
     
#include 
     
      
#include 
      
       
#ifndef OCI_ORACLE
# include 
       
        
#endif

void dir_func(char *FileName,char *Path); 
void file_func(OCIExtProcContext *,char *,int *,int *,int *); void
file_del_func(char *FileName); 

void dir_func(char *FileName,char *Path) { 
 int num; 
        static   FILE   *logfilep = NULL;
        DIR *mydir;
        struct dirent *dp;
 //struct stat mybuff;
 char work_file[100] = " ";

        if ((logfilep = fopen(FileName, "w")) != NULL) {};
//        fprintf(logfilep, "# file = %s path = %sn",FileName,Path);
        mydir = opendir(Path);

        while ((dp = readdir (mydir)) != NULL) {
  if ((strcmp(dp->d_name,".") == 0) || 
      (strcmp(dp->d_name,"..") == 0)) {}
  else {
   strcpy(work_file,dp->d_name);
   //stat(work_file,&mybuff);  
   //fprintf(logfilep, "%s/%s:%i:%in"
,Path,dp->d_name,mybuff.st_size,mybuff.st_blocks);
   fprintf(logfilep, "%s/%sn" ,Path,dp->d_name);
  }
        }

        fclose(logfilep);
}

void file_func(OCIExtProcContext *with_context,char *FileName,
                int *exists,int *filesize,int *block_count) {
        int rtn =0;
        struct stat mybuff;
        long file_size=0;
        char mybyte;

        *exists=1;
//
        rtn = stat(FileName,&mybuff);
        if (rtn ==0) {
                *filesize=mybuff.st_size;
                *block_count=mybuff.st_blocks;
                *exists=0;
        }
        else
        {
                *exists=3;
                *filesize=0;
                *block_count=0;
        }
        *exists=rtn;
}

void file_del_func(char *FileName) {
        int rtn =0;
        struct stat mybuff;
        long file_size=0;
 char myFileName[200] = "/tmp/";
//
 strcat(myFileName,FileName); 
        rtn = stat(myFileName,&mybuff);
        if (rtn ==0) {
  unlink(myFileName);
 }
}

       
      
     
    
   
  
 

This was first published in November 2005

Dig deeper on Oracle database backup and recovery

Pro+

Features

Enjoy the benefits of Pro+ membership, learn more and join.

0 comments

Oldest 

Forgot Password?

No problem! Submit your e-mail address below. We'll send you an email containing your password.

Your password has been sent to:

SearchDataManagement

SearchBusinessAnalytics

SearchSAP

SearchSQLServer

TheServerSide

SearchDataCenter

SearchContentManagement

SearchFinancialApplications

Close