otak-otak | favorite evening food

February 4, 2012

Script to fix streams error at apply side

Filed under: streams — ora62 @ 8:24 am

The script is one of my streams implementation and can save my time to not always keep catching on the dba_apply_error.

CREATE OR REPLACE PROCEDURE fix_err_dtl(in_any IN SYS.ANYDATA, arg_object_owner varchar2, arg_object_name varchar2) IS

lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
object_owner VARCHAR2(30);
object_name VARCHAR2(40);
dmlcommand VARCHAR2(10);
i NUMBER;
l_dmlcommand varchar2(20);
l_dummy number;
lcrrow sys.lcr$_row_list;
pkdata sys.anydata;
pknumber number;
pkdate date;
pkchar10 char(10);
pkchar4 char(4);
pkchar1 char(1);
pkvarchar2 VARCHAR2(32000);

-- field1 through field3 is unique/pk index for table test_control
l_field1 number;
l_field2 number;
l_field3 varchar2(100);

BEGIN
rc := in_any.GETOBJECT(lcr);
object_owner := lcr.GET_OBJECT_OWNER();
object_name := lcr.GET_OBJECT_NAME();
dmlcommand := lcr.GET_COMMAND_TYPE();
IF arg_object_owner='TEST' and arg_object_name='TEST_CONTROL' then

IF dmlcommand in ('INSERT','UPDATE','DELETE') THEN
if dmlcommand in ('UPDATE','DELETE') THEN
lcrrow := lcr.GET_VALUES('OLD');
elsif dmlcommand = 'INSERT' THEN
lcrrow := lcr.GET_VALUES('NEW');
end if;

for i in 1..lcrrow.count loop
if lcrrow(i).column_name = 'FIELD1' THEN
pkdata := lcrrow(i).data;
rc := pkdata.getnumber(pknumber);
l_field1 := pknumber;
elsif lcrrow(i).column_name = 'FIELD2' THEN
pkdata := lcrrow(i).data;
rc := pkdata.getnumber(pknumber);
l_field2 := pknumber;
elsif lcrrow(i).column_name = 'FIELD3' THEN
pkdata := lcrrow(i).data;
rc := pkdata.getvarchar2(pkvarchar2);
l_field3 := pkvarchar2;
end if;
end loop;

begin
select count(*) into l_dummy
from TEST.TEST_CONTROL
where field1=l_field1
and field2=l_field2
and field3=l_field3;

if l_dummy >= 1 then
if dmlcommand = 'INSERT' then
delete from TEST.TEST_CONTROL
where field1=l_field1
and field2=l_field2
and field3=l_field3;
elsif dmlcommand in ('UPDATE','DELETE') and l_dummy >= 1 then
delete from TEST.TEST_CONTROL
where field1=l_field1
and field2=l_field2
and field3=l_field3;

insert into TEST.TEST_CONTROL(field1,field2,field3)

values(l_field1,l_field2,l_field3);

end if;
elsif l_dummy = 0 then
if dmlcommand in ('UPDATE','DELETE') then
insert into TEST.TEST_CONTROL(field1,field2,field3)

values(l_field1,l_field2,l_field3);
end if;
end if;
end;
END IF;
elsIF arg_object_owner='TEST' and arg_object_name='TEST2' then
...........
...........
...........
...........
end if; -- if last
END;
/

the above procedure will be called by this procedure:

CREATE or replace PROCEDURE fix_err(arg_apply_name varchar2) IS

cursor c1 is select LOCAL_TRANSACTION_ID, MESSAGE_NUMBER, MESSAGE_COUNT
FROM DBA_APPLY_ERROR where apply_name=arg_apply_name order by SOURCE_COMMIT_SCN;

i NUMBER;
lcr ANYDATA;
lcr2 sys.lcr$_row_record;
lcrrow sys.lcr$_row_list;
rc2 pls_integer;
l_objname varchar2(50);
l_object_owner varchar2(50);
l_MESSAGE_NUMBER number;
l_LOCAL_TRANSACTION_ID varchar2(50);
l_msg varchar2(100);
BEGIN
for h in c1 loop
for j in 1..h.MESSAGE_COUNT loop
begin
select MESSAGE_NUMBER,LOCAL_TRANSACTION_ID into

l_MESSAGE_NUMBER, l_LOCAL_TRANSACTION_ID
from dba_apply_error
where LOCAL_TRANSACTION_ID=h.LOCAL_TRANSACTION_ID;

if l_MESSAGE_NUMBER is null or l_MESSAGE_NUMBER=0 then
begin
dbms_apply_adm.execute_error(h.LOCAL_TRANSACTION_ID);
exception
when others then
null;
end;
elsif j = l_MESSAGE_NUMBER then
lcr := NULL;
lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE(j,

h.LOCAL_TRANSACTION_ID);
rc2 := lcr.getobject(lcr2);
l_objname := lcr2.GET_OBJECT_NAME;
l_object_owner := lcr2.GET_OBJECT_OWNER();

fix_err_dtl(lcr, l_object_owner, l_objname);

begin
i:=h.MESSAGE_COUNT;
dbms_apply_adm.execute_error(h.LOCAL_TRANSACTION_ID);
exception
when others then
select MESSAGE_NUMBER into i
from dba_apply_error
where LOCAL_TRANSACTION_ID=h.LOCAL_TRANSACTION_ID;

if l_LOCAL_TRANSACTION_ID=h.LOCAL_TRANSACTION_ID

and i<>l_MESSAGE_NUMBER then
null;
else
l_msg := ' id: '||h.LOCAL_TRANSACTION_ID||' num: '||to_char(i);
raise_application_error(-20001,'err: '||SQLCODE||' -ERROR- '||SQLERRM||l_msg);
end if;
end;
end if;
exception
when no_data_found then
null;
end;
end loop;
commit;
end loop;
commit;
END;
/

and finally create scheduler by interval 10 secs for this fix error apply:

conn strmadmin/passwd

create procedure fix_strm_err is
l_status varchar2(10);
l_cnt number;
l_name varchar2(50) := 'BPK_APPLY';
begin
select count(1) into l_cnt from dba_apply_error where apply_name=l_name;

if l_cnt >= 1 then
fix_err(l_name);
end if;

select status into l_status
from dba_apply where apply_name=l_name;

if l_status<>'ENABLED' then
dbms_apply_adm.start_apply(l_name);
end if;

end;
/
BEGIN
SYS.DBMS_SCHEDULER.CREATE_PROGRAM
(
program_name => 'PRG_FIX_STRM'
,program_type => 'PLSQL_BLOCK'
,program_action => 'begin fix_strm_err; end;'
,number_of_arguments => 0
,enabled => FALSE
,comments => NULL
);

SYS.DBMS_SCHEDULER.ENABLE
(name => 'PRG_FIX_STRM');
END;
/
BEGIN
SYS.DBMS_SCHEDULER.CREATE_JOB
(
job_name => 'JOB_FIX_STRM'
,start_date => TO_TIMESTAMP_TZ('2012/02/01 16:59:59.467189 +07:00','yyyy/mm/dd hh24:mi:ss.ff tzh:tzm')
,repeat_interval => 'FREQ=SECONDLY;INTERVAL=10'
,end_date => NULL
,program_name => 'PRG_FIX_STRM'
-- ,job_class => 'BATCH_JOB_CLASS'
,comments => NULL
);
SYS.DBMS_SCHEDULER.SET_ATTRIBUTE
( name => 'JOB_FIX_STRM'
,attribute => 'RESTARTABLE'
,value => TRUE);
SYS.DBMS_SCHEDULER.SET_ATTRIBUTE
( name => 'JOB_FIX_STRM'
,attribute => 'LOGGING_LEVEL'
,value => SYS.DBMS_SCHEDULER.LOGGING_OFF);
--SYS.DBMS_SCHEDULER.LOGGING_FULL
SYS.DBMS_SCHEDULER.SET_ATTRIBUTE_NULL
( name => 'JOB_FIX_STRM'
,attribute => 'MAX_FAILURES');
SYS.DBMS_SCHEDULER.SET_ATTRIBUTE_NULL
( name => 'JOB_FIX_STRM'
,attribute => 'MAX_RUNS');
BEGIN
SYS.DBMS_SCHEDULER.SET_ATTRIBUTE
( name => 'JOB_FIX_STRM'
,attribute => 'STOP_ON_WINDOW_CLOSE'
,value => FALSE);
EXCEPTION
-- could fail if program is of type EXECUTABLE...
WHEN OTHERS THEN
NULL;
END;
SYS.DBMS_SCHEDULER.SET_ATTRIBUTE
( name => 'JOB_FIX_STRM'
,attribute => 'JOB_PRIORITY'
,value => 1);
SYS.DBMS_SCHEDULER.SET_ATTRIBUTE_NULL
( name => 'JOB_FIX_STRM'
,attribute => 'SCHEDULE_LIMIT');
SYS.DBMS_SCHEDULER.SET_ATTRIBUTE
( name => 'JOB_FIX_STRM'
,attribute => 'AUTO_DROP'
,value => FALSE);

SYS.DBMS_SCHEDULER.ENABLE
(name => 'JOB_FIX_STRM');
END;
/

happy streaming….:)

Advertisements

Leave a Comment »

No comments yet.

RSS feed for comments on this post. TrackBack URI

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Blog at WordPress.com.

%d bloggers like this: