本文详细讲解了一种如何把Oracle中的表的数据增量同步到PostgreSQL中的方法,此方法不需要做写程序,只需要使用使用PG的插件oracle_fdw就可以完成,比较方便和易操作。

1. 增量数据迁移的理论基础

我们先看通常的整个数据迁移过程:

  • 先进行一次全量迁移,在全量迁移开始前就开始记录增量变化
  • 增量变化中只需要记录主键,同步可以根据日志表记录的主键到原表中查询数据
    • 把主键的变化记录到一个增量日志表中
    • 对于更新主键的情况,相当于把旧主键删除,插入新主键,所以日志表中记录两条记录
    • 增量同步时,当增量日志表中存在的主键而原表不存在的主键,则在目标表中这些记录都该删除掉
    • 增量同步时,在日志表中记录变化的主键与原先表进行关联查询,查询出来的数据都是应该merge到目标表中的。
  • 把增量变化的数据同步到到目标表中,可以进行多次的增量同步
  • 停止源数据库的写入,进行最后一次增量同
  • 把应用切换到新的数据库上

可以看到本方法的亮点在于,记录增量时,并不把所有的增量数据全部记录下来,只是把发生变化的数据表的rowid或主键记录下来,然后增量时通过批量反查主库,然后把数据“merge”到目标数据库完成了一次增量同步。由于只记录了变化数据的行的rowid或主键,所以对原数据库的写性能影响比较少。

由于是批量“merge”,整体的效率是比一条一条的增量同步快,另在一条一条的增量同步中,多次增量同步的数据不能有交集,否则可能会报错。

虽然说是批量“merge”,但数据库本身并没有这种支持增删改的批量“merge”,需要分增、删除、改。
同一条记录如果被增、删、改多次时,在一条一条的同步方法中,需要考虑先后次序,但通过本方法中是通过表在数据增量同步前与后的数据发生变化的总体情况来全局考虑,避免了这个问题,这时把原理再详细说明一下:

  • 把rowid或主键的变化都记录到一个增量日志表中,不管是这些记录被删除还是更新或新记录插入,都把这些rowid或主键都记录到日志表中。因为oracle本身就可以通过在表上建物化视图log,就可以把发生变化的rowid或主键都记录表一张表中,做本方法中是直接使用物理视图log的功能直接记录rowid的变化。当然如果表的物化视图log被别的用途占用,这时就只能建触发器了。
  • 根据记录rowid或主键变化的日志表通过主键反查原先的表出来的数据,这些数据当然没有包括已删除的数据,在merge数据之前,需要在目标表中,把源库中已不存在的数据删除掉。
  • 不管一条记录是否被修改了很多次,我们只需要把更改多次的最后的数据行同步到目标库中,即可,不需要考虑其顺序。

下面讲述实际的操作过程:

2. Oracle中记录增量日志的方法

  • 使用触发器记录: 这时略过。
  • 使用物化视图 LOG

    • 创建物化视图 LOG 的 SQL 如下:

      1. −−在 scott emp log
      2. CREATE MATERIALIZED VIEW LOG ON SCOTT.EMP WITH PRIMARY KEY;
    • 将会自动生成这样的日志表

      1. SQL> desc mlog$_emp;
      2. Name Null? Type
      3. ----------------------------------------- -------- ----------------------------
      4. EMPNO NUMBER(4)
      5. SNAPTIME$$ DATE
      6. DMLTYPE$$ VARCHAR2(1)
      7. OLD_NEW$$ VARCHAR2(1)
      8. CHANGE_VECTOR$$ RAW(255)
      9. XID$$ NUMBER

3. 具体增量同步的方法

我们以一张名为emp表做同步的示例,这张表emp是oracle中的一张练习表,可以通过执行oracle提供的脚本“$ORACLE_HOME/rdbms/admin/scott.sql”来建立。

为了详细讲解如何从源库中反查数据,我们把反查数据的方法逻辑上分拆成几步:

  • 下面SQL提取出来的数据是需要merge(这里的merge是如果存在则update,如果不存在则插入)到目标表中:

    1. SELECT empno, ename, job, mgr, hiredate, sal, comm,deptno
    2. FROM emp a
    3. WHERE EXISTS(SELECT 1 FROM mlog$_emp b WHERE a.empno = b.empno);
  • 下面SQL提取出来的主键值的数据行都该在目标表中删除掉:

    1. SELECT DISTINCT empno FROM mlog$_emp a
    2. WHERE NOT EXISTS (SELECT 1 FROM emp b WHERE a.empno = b.empno);
  • 为了保证数据一致性,从源库中反查出的【在目标表中要删除掉的数据】与【要merge进去的数据】都需要在一个查询操作中查询出来,否则会有如果我们把上面的两个SQL分别执行,在分开执行期间,这些数据发生了变化,就会导致数据的错误。我们可以把上面的两个SQL用UNION ALL拼成一个SQL,这样就解决了一致性问题。因为Oracle实现的一致性读,即快照读的功能。
  • 目标数据同步完后,源库中增量日志表中相应的记录也应被删除掉,所以需要记录这些变化数据记录在增量日志表中的rowid,注意这个rowid不是变化记录行的rowid,而是在增量日志表中的rowid。

3.1 反查时数据的具体方法

  • 如前面所述,把多条SQL使用union all都合并成一条SQL,一次执行:

    1. CREATE VIEW vw____emp as
    2. SELECT 1 as x____action, null as x____row_id,
    3. empno, ename, job, mgr, hiredate, sal, comm, deptno
    4. FROM emp a
    5. WHERE EXISTS(SELECT 1 FROM mlog$_emp b WHERE a.empno = b.empno)
    6. UNION ALL
    7. SELECT DISTINCT 2 AS x____action, null AS x____row_id,
    8. empno, NULL, NULL, NULL, NULL, NULL, NULL, NULL
    9. FROM mlog$_emp a
    10. WHERE NOT EXISTS (SELECT 1 FROM emp b WHERE a.empno = b.empno)
    11. UNION ALL
    12. SELECT 3 AS x____action, owidtochar(rowid) AS x____row_id,
    13. NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL
    14. FROM mlog$_emp;
  • x_action 表示上面三种SQL的类型(为什么叫xaction,是因为这个名称与表原有的字段重名的概率很低,当然也可以取其它的名称)
  • 上面UNION ALL中连接的第三个SQL记录了当前增量日志表的rowid,为增量同步完后删除增量日志时使用

3.2 创建oracle_fdw

在PostgreSQL建同步的表。

  • emp表的定义如下:

    1. CREATE TABLE emp (
    2. empno smallint NOT NULL primary key,
    3. ename varchar(10),
    4. job varchar(9),
    5. mgr smallint,
    6. hiredate timestamp,
    7. sal double precision,
    8. comm double precision,
    9. deptno smallint
    10. ) ;
  • 建外部服务:

    1. CREATE EXTENSION oracle_fdw;
    2. CREATE SERVER oradb FOREIGN DATA WRAPPER oracle_fdw
    3. OPTIONS (dbserver 'oratest');
    4. GRANT USAGE ON FOREIGN SERVER oradb TO scott;
  • 建用户映射(在scott用户下):

    1. CREATE USER MAPPING FOR scott SERVER oradb
    2. OPTIONS (user 'SCOTT', password 'tiger');
  • 建外部表

    1. CREATE FOREIGN TABLE fdw____emp (
    2. empno smallint NOT NULL,
    3. ename varchar(10),
    4. job varchar(9),
    5. mgr smallint,
    6. hiredate timestamp,
    7. sal double precision,
    8. comm double precision,
    9. deptno smallint
    10. ) SERVER oradb OPTIONS(schema 'SCOTT', table 'EMP');
    11. CREATE FOREIGN TABLE vw____emp (
    12. x____action smallint NOT NULL,
    13. x____row_id text,
    14. empno smallint NOT NULL,
    15. ename varchar(10),
    16. job varchar(9),
    17. mgr smallint,
    18. hiredate timestamp,
    19. sal double precision,
    20. comm double precision,
    21. deptno smallint
    22. ) SERVER oradb OPTIONS(schema 'SCOTT', table 'VW____EMP');

3.3 增量同步的操作步骤

  • 把增量数据提取到PG中的一张临时表中:

    1. CREATE TEMP TABLE IF NOT EXISTS tmp____emp(
    2. x____action smallint NOT NULL,
    3. x____row_id text,
    4. empno smallint,
    5. ename varchar(10),
    6. job varchar(9),
    7. mgr smallint,
    8. hiredate timestamp,
    9. sal double precision,
    10. comm double precision,
    11. deptno smallint
    12. );
    13. INSERT INTO tmp____emp SELECT * FROM vw____emp;
  • merge数据

    1. WITH upsert as
    2. (UPDATE emp AS m SET ename = t.ename, job=t.job, mgr=t.mgr,
    3. hiredate = t.hiredate, sal=t.sal, comm=t.comm, deptno=t.deptno
    4. FROM tmp____emp t
    5. WHERE t.x____action = 1 AND m.empno = t.empno
    6. RETURNING m.*)
    7. INSERT INTO emp
    8. SELECT empno, ename, job, mgr, hiredate, sal, comm, deptno FROM tmp____emp a
    9. WHERE a.x____action = 1
    10. AND NOT EXISTS(SELECT 1 FROM upsert b WHERE a.empno=b.empno);
    11. DELETE FROM emp a WHERE EXISTS(
    12. SELECT 1 FROM tmp____emp b WHERE x____action=2 AND a.empno=b.empno);

3.4 清理已同步过的增量日志

  • 按临时表tmp____emp中记录的rowid来删除Oracle中增量日志表中已同步过的记录,但因oracle_fdw建的外部表无法使用rowid字段,即无法执行下面的SQL:

    1. DELETE FROM fdw_mlog$_emp WHERE rowid in (SELECT x____row_id FROM tmp____emp);
  • 解决方法是,在Oracle建一张临时表,然后在临时表上建触发器,向临时表插入数据时,触发器删除日志表中的数据,方法如下:

    1. CREATE GLOBAL TEMPORARY TABLE clean_emp_mvlog(row_id varchar2(18))
    2. ON COMMIT DELETE ROWS;
    3. CREATE OR REPLACE TRIGGER trg_af_row_clean_emp_mvlog
    4. AFTER INSERT ON clean_emp_mvlog
    5. FOR EACH ROW
    6. declare
    7. begin
    8. DELETE FROM mlog$_emp WHERE rowid = chartorowid(:new.row_id);
    9. end;
    10. /
  • 在PG中建Oracle中表cmd_clean_mvlog的外部表:

    1. CREATE FOREIGN TABLE clean_emp_mvlog(row_id varchar(18))
    2. SERVER oradb OPTIONS ( schema 'SCOTT', table 'CLEAN_EMP_MVLOG');
  • 删除Oracle中已同步过的记录方法是在PG中执行:

    1. INSERT INTO cmd_clean_mvlog SELECT row_id FROM tmp____emp WHERE x____action=3;

4. 增量同步方法的改进

上面的整个操作过程比较复杂,如果有很多表要同步,手工操作起来比较麻烦,可以优化:

思路:可以把上面的手工过程封装在函数中,把对在Oracle数据库中的ddl操作都能在PG中执行。

  • 方法是在Oracle中建一个命令表,然后在命令表上建触发器,然后把这张命令表映射成PG中的一张外部表,当把一条SQL插入这个外部表中,就会让Oracle中的触发器执行这条SQL
  • 建命令表的SQL如下:

    1. CREATE GLOBAL TEMPORARY TABLE replica_cmd( cmd varchar2 (4000)) ON COMMIT DELETE ROWS;
    2. CREATE OR REPLACE TRIGGER trg_af_insrow_replica_cmd
    3. AFTER INSERT ON replica_cmd
    4. FOR EACH ROW
    5. DECLARE
    6. PRAGMA AUTONOMOUS_TRANSACTION;
    7. BEGIN
    8. EXECUTE IMMEDIATE :new.cmd;
    9. COMMIT;
    10. END;
  • 注意1:上面建的命令表是临时表,因为我们只需要执行触发器,而不需要保存执行的SQL
  • 注意2: 如果想在触发器中执行DDL,需要在自治事务中:PRAGMA AUTONOMOUS_TRANSACTION
  • 注意3:因为是在存储过程中执行DDL,需要显式的对用户赋DDL权限,如:

    1. GRANT create table TO scott;
  • 把Oracle中的命令表映射为PG中的外部表
  1. CREATE FOREIGN TABLE fdw_replica_cmd (
  2. cmd varchar(4000)
  3. ) SERVER oradb OPTIONS ( schema 'SCOTT', table 'REPLICA_CMD');
  • 以后就可以这样在向Oracle数据库发送DDL命令了:
  1. INSERT INTO fdw_replica_cmd values('CREATE TABLE test01(id number)');
  • 函数的实际封装如下:

    • add_table_to_replica函数:把要同步的表加入同步中,例子如下:

      1. SELECT add_table_to_replica('scott','emp');
    • remove_table_from_replica函数:把已同步的表从同步中去掉,例子如下:

      1. SELECT remove_table_from_replica('scott','emp');
    • refresh_increment函数:增量同步数据,例子如下:

      1. SELECT refresh_increment('scott','emp');
    • clean_current_increment_log函数:清理掉本次已同步过的增量日志(需确定增量日志已同步过了,否则会丢失增量),例子如下:

      1. SELECT clean_current_increment_log('scott','emp');
  • 函数add_table_to_replica函数的实现如下

  1. CREATE OR REPLACE FUNCTION add_table_to_replica(arg_schema_name text, arg_table_name text)
  2. RETURNS text AS
  3. $BODY$
  4. DECLARE
  5. full_table_name text := arg_schema_name||'.'||arg_table_name;
  6. r1 RECORD;
  7. r2 RECORD;
  8. cols_name_list text[];
  9. cols_type_list text[];
  10. pk_name_list text[];
  11. cols_name_str text;
  12. cols_name_type_str text;
  13. create_tmp_table_sql text;
  14. a_cols_name_str text;
  15. ab_join_cond_str text;
  16. mt_join_cond_str text;
  17. pk_null_str text;
  18. cols_null_str text;
  19. item text;
  20. va text;
  21. i int;
  22. out_info text;
  23. local_sql text;
  24. ora_sql text;
  25. BEGIN
  26. FOR r1 IN
  27. SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod) as coltype,a.attnum
  28. FROM pg_catalog.pg_attribute a
  29. WHERE a.attrelid = full_table_name::regclass
  30. AND a.attnum > 0
  31. AND NOT a.attisdropped
  32. ORDER BY a.attnum
  33. LOOP
  34. cols_name_list := cols_name_list || r1.attname::text;
  35. cols_type_list := cols_type_list || r1.coltype::text;
  36. END LOOP;
  37. FOR r2 IN
  38. SELECT
  39. pg_attribute.attname,
  40. format_type(pg_attribute.atttypid, pg_attribute.atttypmod)
  41. FROM pg_index, pg_class, pg_attribute
  42. WHERE
  43. pg_class.oid = full_table_name::regclass AND
  44. indrelid = pg_class.oid AND
  45. pg_attribute.attrelid = pg_class.oid AND
  46. pg_attribute.attnum = any(pg_index.indkey)
  47. AND indisprimary
  48. LOOP
  49. pk_name_list := pk_name_list || r2.attname::text;
  50. END LOOP;
  51. i :=1;
  52. FOREACH item IN ARRAY cols_name_list
  53. LOOP
  54. IF i = 1 THEN
  55. cols_name_type_str := cols_name_list[i] ||' '||cols_type_list[i];
  56. ELSE
  57. cols_name_type_str := cols_name_type_str || ', ' || cols_name_list[i] ||' '||cols_type_list[i];
  58. END IF;
  59. i := i+1;
  60. END LOOP;
  61. i :=1;
  62. FOREACH item IN ARRAY pk_name_list
  63. LOOP
  64. IF i = 1 THEN
  65. ab_join_cond_str := 'a.'||item ||' = b.'||item;
  66. mt_join_cond_str := 'm.'||item ||' = t.'||item;
  67. ELSE
  68. ab_join_cond_str := ab_join_cond_str || ' AND a.' || item ||' = b.'||item;
  69. mt_join_cond_str := mt_join_cond_str || ' AND m.' || item ||' = t.'||item;
  70. END IF;
  71. i := i+1;
  72. END LOOP;
  73. i := 1;
  74. FOREACH item IN ARRAY cols_name_list
  75. LOOP
  76. IF i =1 THEN
  77. a_cols_name_str := 'a.'||item;
  78. ELSE
  79. a_cols_name_str := a_cols_name_str||', a.'||item;
  80. END IF;
  81. i := i + 1;
  82. END LOOP;
  83. -- 生成pk_null_str,和cols_null_str
  84. -- pk_null_str中除了主键的列名外,其它填null
  85. -- cols_null_str即对每一列填null
  86. pk_null_str = '';
  87. i := 1;
  88. FOREACH item IN ARRAY cols_name_list
  89. LOOP
  90. IF item = ANY(pk_name_list) THEN
  91. va := item;
  92. ELSE
  93. va := 'null';
  94. END IF;
  95. IF i =1 THEN
  96. pk_null_str := va;
  97. cols_null_str :='null';
  98. ELSE
  99. pk_null_str := pk_null_str ||', '||va;
  100. cols_null_str := cols_null_str || ', null';
  101. END IF;
  102. i := i + 1;
  103. END LOOP;
  104. -- Oracle中生成表的物化视图log
  105. ora_sql := format('CREATE MATERIALIZED VIEW LOG ON %s.%s WITH PRIMARY KEY', arg_schema_name, arg_table_name);
  106. INSERT INTO fdw_replica_cmd values( ora_sql);
  107. out_info := format(E'Run in oracle: %s\n', ora_sql);
  108. -- Oracle中生成用于同步的一个的视图
  109. ora_sql := format( E'CREATE VIEW vw____%s AS \n'||
  110. E'SELECT 1 as x____action, null as x____row_id, %s FROM %s a \n'||
  111. E' WHERE EXISTS (SELECT 1 FROM mlog$_%s b WHERE %s)\n' ||
  112. E'UNION ALL \n'||
  113. E'SELECT DISTINCT 2 AS x____action, null AS x____row_id, %s FROM mlog$_%s a \n'||
  114. E' WHERE NOT EXISTS (SELECT 1 FROM %s b WHERE %s) \n'||
  115. E'UNION ALL \n'||
  116. E'SELECT 3 AS x____action, rowidtochar(rowid) AS x____row_id, %s FROM mlog$_%s',
  117. arg_table_name, a_cols_name_str, arg_table_name, arg_table_name, ab_join_cond_str,
  118. pk_null_str, arg_table_name, arg_table_name, ab_join_cond_str,
  119. cols_null_str, arg_table_name
  120. );
  121. INSERT INTO fdw_replica_cmd values( ora_sql);
  122. out_info := out_info || format(E'Run in oracle: %s\n', ora_sql);
  123. -- Oracle中的增量数据的视图映射到本地的一个外部表上
  124. local_sql := format ( E'CREATE FOREIGN TABLE vw____%s ( \n'||
  125. E'x____action smallint NOT NULL, \n' ||
  126. E'x____row_id text, \n' ||
  127. E'%s )' ||
  128. E'SERVER oradb OPTIONS ( schema \'%s\', table \'VW____%s\')',
  129. arg_table_name,
  130. cols_name_type_str,
  131. upper(arg_schema_name),
  132. upper(arg_table_name)
  133. );
  134. EXECUTE local_sql;
  135. out_info := out_info || format(E'Run: %s\n', local_sql);
  136. -- Oracle中的远程的表映射到本地的一个外部表上
  137. local_sql := format ( E'CREATE FOREIGN TABLE fdw____%s ( \n'||
  138. E'%s )' ||
  139. E'SERVER oradb OPTIONS ( schema \'%s\', table \'%s\')',
  140. arg_table_name,
  141. cols_name_type_str,
  142. upper(arg_schema_name),
  143. upper(arg_table_name)
  144. );
  145. EXECUTE local_sql;
  146. out_info := out_info || format(E'Run: %s\n', local_sql);
  147. return out_info;
  148. END;
  149. $BODY$
  150. LANGUAGE plpgsql VOLATILE;
  • 函数remove_table_from_replic的实现如下
  1. CREATE OR REPLACE FUNCTION remove_table_from_replica(arg_schema_name text, arg_table_name text)
  2. RETURNS text AS
  3. $BODY$
  4. DECLARE
  5. out_info text;
  6. local_sql text;
  7. ora_sql text;
  8. v_detail text;
  9. BEGIN
  10. -- 删除在Oracle中生成表的物化视图log
  11. ora_sql := format('DROP MATERIALIZED VIEW LOG ON %s.%s', arg_schema_name, arg_table_name);
  12. BEGIN
  13. INSERT INTO fdw_replica_cmd values( ora_sql);
  14. out_info := format(E'Run in oracle: %s\n', ora_sql);
  15. EXCEPTION
  16. WHEN OTHERS THEN
  17. GET STACKED DIAGNOSTICS v_detail = PG_EXCEPTION_DETAIL;
  18. out_info := format(E'Run in oracle failed: %s;\n SQLSTATE=%s, %s\n %s\n', ora_sql, SQLSTATE, SQLERRM, v_detail);
  19. END;
  20. -- 删除在Oracle中生成用于同步的一个的视图
  21. ora_sql := format( E'DROP VIEW %s.vw____%s', arg_schema_name, arg_table_name);
  22. BEGIN
  23. INSERT INTO fdw_replica_cmd values( ora_sql);
  24. out_info := out_info || format(E'Run in oracle: %s\n', ora_sql);
  25. EXCEPTION
  26. WHEN OTHERS THEN
  27. GET STACKED DIAGNOSTICS v_detail = PG_EXCEPTION_DETAIL;
  28. out_info := out_info || format(E'Run in oracle failed: %s;\n SQLSTATE=%s, %s\n %s\n', ora_sql, SQLSTATE, SQLERRM, v_detail);
  29. END;
  30. -- 删除把Oracle中的增量数据的视图映射到本地的外部表
  31. local_sql := format ('DROP FOREIGN TABLE %s.vw____%s', arg_schema_name, arg_table_name);
  32. BEGIN
  33. EXECUTE local_sql;
  34. out_info := out_info || format(E'Run: %s\n', local_sql);
  35. EXCEPTION
  36. WHEN OTHERS THEN
  37. out_info := out_info || format(E'Run in oracle failed: %s;\n SQLSTATE=%s, %s\n', ora_sql, SQLSTATE, SQLERRM);
  38. END;
  39. -- 删除把Oracle中的远程的表映射到本地的外部表上
  40. local_sql := format ( 'DROP FOREIGN TABLE %s.fdw____%s', arg_schema_name, arg_table_name);
  41. BEGIN
  42. EXECUTE local_sql;
  43. out_info := out_info || format(E'Run: %s\n', local_sql);
  44. EXCEPTION
  45. WHEN OTHERS THEN
  46. out_info := out_info || format(E'Run in oracle failed: %s;\n SQLSTATE=%s, %s\n', ora_sql, SQLSTATE, SQLERRM);
  47. END;
  48. return out_info;
  49. END;
  50. $BODY$
  51. LANGUAGE plpgsql VOLATILE;
  • 函数refresh_increment的实现如下
  1. CREATE OR REPLACE FUNCTION refresh_increment(arg_schema_name text, arg_table_name text)
  2. RETURNS text AS
  3. $BODY$
  4. DECLARE
  5. full_table_name text := arg_schema_name||'.'||arg_table_name;
  6. tmp_table_name text := 'x____tmp_'||arg_schema_name||'_'||arg_table_name;
  7. r1 RECORD;
  8. r2 RECORD;
  9. cols_name_list text[];
  10. cols_type_list text[];
  11. pk_name_list text[];
  12. cols_name_str text;
  13. cols_name_type_str text;
  14. create_tmp_table_sql text;
  15. a_cols_name_str text;
  16. ab_join_cond_str text;
  17. mt_join_cond_str text;
  18. pk_null_str text;
  19. cols_null_str text;
  20. up_set_str text;
  21. item text;
  22. va text;
  23. i int;
  24. insert_sql text;
  25. merge_sql text;
  26. delete_sql text;
  27. tj_sql text;
  28. merge_nums int;
  29. delete_nums int;
  30. BEGIN
  31. FOR r1 IN
  32. SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod) as coltype,a.attnum
  33. FROM pg_catalog.pg_attribute a
  34. WHERE a.attrelid = full_table_name::regclass
  35. AND a.attnum > 0
  36. AND NOT a.attisdropped
  37. ORDER BY a.attnum
  38. LOOP
  39. cols_name_list := cols_name_list || r1.attname::text;
  40. cols_type_list := cols_type_list || r1.coltype::text;
  41. END LOOP;
  42. FOR r2 IN
  43. SELECT
  44. pg_attribute.attname,
  45. format_type(pg_attribute.atttypid, pg_attribute.atttypmod)
  46. FROM pg_index, pg_class, pg_attribute
  47. WHERE
  48. pg_class.oid = full_table_name::regclass AND
  49. indrelid = pg_class.oid AND
  50. pg_attribute.attrelid = pg_class.oid AND
  51. pg_attribute.attnum = any(pg_index.indkey)
  52. AND indisprimary
  53. LOOP
  54. pk_name_list := pk_name_list || r2.attname::text;
  55. END LOOP;
  56. i :=1;
  57. FOREACH item IN ARRAY cols_name_list
  58. LOOP
  59. IF i = 1 THEN
  60. cols_name_type_str := cols_name_list[i] ||' '||cols_type_list[i];
  61. ELSE
  62. cols_name_type_str := cols_name_type_str || ', ' || cols_name_list[i] ||' '||cols_type_list[i];
  63. END IF;
  64. i := i+1;
  65. END LOOP;
  66. i :=1;
  67. FOREACH item IN ARRAY pk_name_list
  68. LOOP
  69. IF i = 1 THEN
  70. ab_join_cond_str := 'a.'||item ||' = b.'||item;
  71. mt_join_cond_str := 'm.'||item ||' = t.'||item;
  72. ELSE
  73. ab_join_cond_str := ab_join_cond_str || ' AND a.' || item ||' = b.'||item;
  74. mt_join_cond_str := mt_join_cond_str || ' AND m.' || item ||' = t.'||item;
  75. END IF;
  76. i := i+1;
  77. END LOOP;
  78. cols_name_str := array_to_string(cols_name_list, ',');
  79. -- 组合a_cols_name_str内容为:a.col1, a.col2, a.col3 ...
  80. i := 1;
  81. FOREACH item IN ARRAY cols_name_list
  82. LOOP
  83. IF i =1 THEN
  84. a_cols_name_str := 'a.'||item;
  85. ELSE
  86. a_cols_name_str := a_cols_name_str||', a.'||item;
  87. END IF;
  88. i := i + 1;
  89. END LOOP;
  90. pk_null_str = '';
  91. i := 1;
  92. FOREACH item IN ARRAY cols_name_list
  93. LOOP
  94. IF item = ANY(pk_name_list) THEN
  95. va := item;
  96. ELSE
  97. va := 'null';
  98. END IF;
  99. IF i =1 THEN
  100. pk_null_str := va;
  101. cols_null_str :='null';
  102. ELSE
  103. pk_null_str := pk_null_str ||', '||va;
  104. cols_null_str := cols_null_str || ', null';
  105. END IF;
  106. i := i + 1;
  107. END LOOP;
  108. -- 生成update语句中的set col1=v1,col2=v2的字符串
  109. i := 1;
  110. FOREACH item IN ARRAY cols_name_list
  111. LOOP
  112. IF item = ANY(pk_name_list) THEN
  113. CONTINUE;
  114. END IF;
  115. IF i =1 THEN
  116. up_set_str := item ||' = t.'||item;
  117. ELSE
  118. up_set_str := up_set_str || ',' || item || ' = t.' || item;
  119. END IF;
  120. i := i + 1;
  121. END LOOP;
  122. -- 增量数据需要访问多次,如果多次访问远程的增量表,每次访问的数据是不一样的,无法保证一致性,因此把远程的增量数据保证到本地的临时表中。
  123. -- 创建这张临时表,保存这次操作的增量数据
  124. create_tmp_table_sql := format(
  125. E'CREATE TEMP TABLE IF NOT EXISTS \n'||
  126. E'%s(x____action int, x____row_id text, %s)',
  127. tmp_table_name, cols_name_type_str);
  128. -- 把远程Oracle的增量数据(即对应本地的一张外部表)的数据插到本地的临时表中
  129. insert_sql := format(
  130. E'INSERT INTO %s\n'||
  131. E'SELECT x____action, x____row_id, %s FROM vw____%s\n',
  132. tmp_table_name, cols_name_str, arg_table_name);
  133. -- merge增量数据的SQL
  134. merge_sql := format(
  135. E'WITH upsert as \n'||
  136. E'(UPDATE %s AS m SET %s\n'||
  137. E' FROM %s t \n'||
  138. E' WHERE t.x____action = 1 AND %s \n'||
  139. E'RETURNING m.*)\n'||
  140. E'INSERT INTO %s \n'||
  141. E'SELECT %s FROM %s a \n'||
  142. E' WHERE a.x____action = 1 \n'||
  143. E' AND NOT EXISTS(SELECT 1 FROM upsert b WHERE %s)',
  144. full_table_name, up_set_str,
  145. tmp_table_name, mt_join_cond_str,
  146. full_table_name, cols_name_str,
  147. tmp_table_name, ab_join_cond_str);
  148. --增量数据中的删除的SQL
  149. delete_sql := format(
  150. E'DELETE FROM %s a WHERE EXISTS(SELECT 1 FROM %s b WHERE x____action=2 AND %s)',
  151. full_table_name, tmp_table_name, ab_join_cond_str);
  152. EXECUTE create_tmp_table_sql;
  153. EXECUTE insert_sql;
  154. EXECUTE merge_sql;
  155. EXECUTE delete_sql;
  156. tj_sql := 'SELECT sum(case x____action when 1 then 1 else 0 end), '||
  157. 'sum(case x____action when 2 then 1 else 0 end) FROM '||tmp_table_name;
  158. EXECUTE tj_sql into merge_nums, delete_nums;
  159. RETURN format('merge %s rows, delete %s rows.', merge_nums, delete_nums);
  160. END;
  161. $BODY$
  162. LANGUAGE plpgsql VOLATILE;
  • 函数clean_current_increment_log的实现如下:
  1. CREATE OR REPLACE FUNCTION clean_current_increment_log(arg_schema_name text, arg_table_name text)
  2. RETURNS int AS
  3. $BODY$
  4. DECLARE
  5. -- 定义在一次循环时清理的rowid个数
  6. rowid_count_in_loop int := 200;
  7. full_table_name text := arg_schema_name||'.'||arg_table_name;
  8. tmp_table_name text := 'x____tmp_'||arg_schema_name||'_'||arg_table_name;
  9. ref refcursor;
  10. rowid text;
  11. all_rowid_str text;
  12. ora_sql text;
  13. clean_sql text;
  14. clean_tmp_table_sql text;
  15. i int;
  16. total int;
  17. BEGIN
  18. i := 0;
  19. total :=0;
  20. -- 循环存储增量的昨时表,找出当时的rowid
  21. OPEN ref FOR EXECUTE 'SELECT x____row_id FROM '|| quote_ident(tmp_table_name)||' WHERE x____action=3';
  22. LOOP
  23. FETCH ref INTO rowid;
  24. EXIT WHEN not found;
  25. IF i = 0 THEN
  26. all_rowid_str := ''''||rowid||'''';
  27. ELSE
  28. all_rowid_str := all_rowid_str || ', '''||rowid||'''';
  29. END IF;
  30. i := i + 1;
  31. IF i > rowid_count_in_loop THEN
  32. ora_sql = format('DELETE FROM mlog$_%s WHERE rowid in (%s)', arg_table_name, all_rowid_str);
  33. INSERT INTO fdw_replica_cmd values( ora_sql);
  34. total := total + i;
  35. i := 0;
  36. all_rowid_str :='';
  37. END IF;
  38. END LOOP;
  39. CLOSE ref;
  40. IF i >0 THEN
  41. ora_sql = format('DELETE FROM mlog$_%s WHERE rowid in (%s)', arg_table_name, all_rowid_str);
  42. INSERT INTO fdw_replica_cmd values( ora_sql);
  43. total := total + i;
  44. END IF;
  45. IF total > 0 THEN
  46. clean_tmp_table_sql = 'TRUNCATE TABLE '||quote_ident(tmp_table_name);
  47. EXECUTE clean_tmp_table_sql;
  48. END IF;
  49. return total;
  50. END;
  51. $BODY$
  52. LANGUAGE plpgsql VOLATILE;
0 评论  
添加一条新评论