文章

OGG2KAFKA

OGG2KAFKA

前言

Oracle GoldenGate(OGG)是 Oracle 公司的一款数据同步工具,支持多种数据库之间的数据同步,包括 Oracle 数据库、MySQL、SQL Server、DB2 等。本文主要介绍 OGG 与 Kafka 的集成,实现将 Oracle 的数据同步到 Kafka 的功能。

文中的 ip 和表名是脱敏,参考的时候根据个人的实际情况修改

项目环境

源端

Oracle :19.3.0

OS:CentOS 7.7

OGG:Oracle GoldenGate Command Interpreter for Oracle

1
    Version 21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047_FBO

目标端

Kafka:2.4.1

1
kafka 集群 broker 地址:1.1.1.101:9092,1.1.1.102:9092,1.1.1.103:9092

OS:CentOS 7.7

OGG:Oracle GoldenGate for Big Data Version 21.4.0.0.0 (Build 002)

1
2
	Oracle GoldenGate Command Interpreter
		Version 21.4.0.0.0 OGGCORE_21.4.0.0.0OGGRU_PLATFORMS_211022.1803

安装 OGG

源端

下载 OGG 安装包

官网地址,选择对应平台的版本下载。

解压安装包

1
2
3
4
5
   ogg_dir=/u01/ogg/ogg21
   mkdir -pv $ogg_dir
   unzip 213000_fbo_ggs_Linux_x64_Oracle_shiphome.zip -d $ogg_dir
   cd $ogg_dir

安装 OGG

  1. 准备响应文件
    1
    
    grep -v "^#" fbo_ggs_Linux_x64_Oracle_shiphome/Disk1/response/oggcore.rsp | sed  '/^$/d' > oggcore.rsp
    
  2. 修改 oggcore.rsp 文件 ~~~shell vim oggcore.rsp

oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v21_1_0 INSTALL_OPTION=ORA19C SOFTWARE_LOCATION=/u01/ogg/ogg21 START_MANAGER=false MANAGER_PORT= DATABASE_LOCATION= INVENTORY_LOCATION= UNIX_GROUP_NAME=

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
3. 配置 ogg 环境变量

如果你的环境变量在安装 Oracle 实例的时候已经配置好了,可以忽略 for oracle 部分

~~~shell
vim ~/.bash_profile

#for oracle
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$ORACLE_HOME/rdbms/lib:$ORACLE_HOME/network/lib:/lib:/usr/lib
export CLASSPATH=$ORACLE_HOME/jlib:$ORACLE_HOME/rdbms/jlib:$ORACLE_HOME/network/jlib
export NLS_LANG="SIMPLIFIED CHINESE_CHINA.AL32UTF8"

#for ogg
export OGG_HOME=/u01/app/ogg/ogg21
export PATH=$OGG_HOME/bin:$PATH:$ORACLE_HOME/bin
export LD_LIBRARY_PATH=$OGG_HOME:$ORACLE_HOME/lib:/usr/lib
alias ggsci='cd $OGG_HOME;ggsci'
  1. 安装 OGG ~~~shell source ~/.bash_profile ./runInstaller -silent -responseFile /u01/ogg/ogg21/oggcore.rsp

#安装完成后,执行 ggsci 命令进入 OGG 管理界面 ggsci Oracle GoldenGate Command Interpreter for Oracle Version 21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047_FBO Oracle Linux 7, x64, 64bit (optimized), Oracle Database 21c and lower supported versions on Jul 29 2021 03:59:23 操作系统字符集标识为 UTF-8。

Copyright (C) 1995, 2021, Oracle and/or its affiliates. All rights reserved.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

## 目标端安装 OGG

### 下载 OGG 安装包

  [官网地址](https://www.oracle.com/middleware/technologies/goldengate-downloads.html),选择对应平台的版本下载,需要注意的要选择 Big Data 版本

### 解压安装

  bigdata 版本的 OGG 安装就很简单了,解压即可
~~~shell
## 创建目录
mkdir /ogg
scp $ogg_software /ogg/

## 解压
pushd /ogg
unzip 214000_ggs_Linux_x64_BigData_64bit.zip -d ogg21
tar xf ../ggs_Linux_x64_BigData_64bit.tar

## 删除指定文件之外的所有文件
## [root@bigdata01 ogg]# rm -rf !(214000_ggs_Linux_x64_BigData_64bit.zip)

[root@bigdata01 ogg21]# ./ggsci
./ggsci: error while loading shared libraries: libjvm.so: cannot open shared object file: No such file or directory
[root@bigdata01 ogg21]#

[root@bigdata01 ogg21]# locate libjvm.so
/home/module/jdk1.8.0_333/jre/lib/amd64/server/libjvm.so
/module/jre/lib/amd64/server/libjvm.so
/opt/module/jdk1.8/jre/lib/amd64/server/libjvm.so
[root@bigdata01 ogg21]#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@bigdata01 ogg21]# env | grep LIBRARY
LD_LIBRARY_PATH=/opt/module/oracle_instant_client:/opt/module/oracle_instant_client:/opt/module/oracle/instantclient_19_19

## 配置 ogg 环境变量
[root@bigdata01 ogg21]# pwd
/ogg/ogg21

[root@bigdata01 ogg21]# export LD_LIBRARY_PATH=/ogg/ogg21/lib:/home/module/jdk1.8.0_333/jre/lib/amd64/server/
[root@bigdata01 ogg21]#

## vim ~/.bash_profile
export GGATE=/ogg/ogg21
export LD_LIBRARY_PATH=/ogg/ogg21/lib:/home/module/jdk1.8.0_333/jre/lib/amd64/server/

[root@bigdata01 ogg21]# ./ggsci
Oracle GoldenGate for Big Data
Version 21.4.0.0.0 (Build 002)
Oracle GoldenGate Command Interpreter
Version 21.4.0.0.0 OGGCORE_21.4.0.0.0OGGRU_PLATFORMS_211022.1803
Oracle Linux 7, x64, 64bit (optimized), Generic  on Oct 22 2021 23:14:43
Operating system character set identified as UTF-8.
Copyright (C) 1995, 2021, Oracle and/or its affiliates. All rights reserved.
GGSCI (bigdata01) 1>

配置 OGG

源端配置 OGG

说明

如果目标端需要每张表都单独保存为一个 topic 的话,在源端的抽取进程和传输进程也可以将多张表的信息配置为一个进程,只需要在目标端的恢复进程单独配置一个进程即可。

添加 OGG 用户

OGG 同步最好使用一个专门的用户,这样可以避免权限问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sqlplus / as sysdba

-- 创建表空间,为 goldengate 用户创建指定的表空间 TBS_OGG,表空间并必须大于等于 1000m
SQL> create tablespace TBS_OGG datafile '/u01/app/oracle/oradata/gguser.dbf' size 2G autoextend on;
-创建用户:
SQL> create user ogguser identified by "OGG_user_123" default tablespace TBS_OGG temporary tablespace TEMP quota unlimited on TBS_OGG;
--授权给 ogguser 用户:
SQL > GRANT CONNECT TO ogguser;
SQL > GRANT DBA TO ogguser;
-- 如果安全需要不能直接给 dba 权限,可能需要如下权限
grant SELECT ANY DICTIONARY to ogguser;
GRANT EXECUTE ON SYS.DBMS_LOCK TO ogguser;
grant select any transaction to ogguser;
grant select any table to ogguser;
grant flashback any table to ogguser;
grant alter any table to ogguser;

设置归档

ogg 同步 oracle 数据库需要开启归档模式,如果数据库没有开启归档模式,需要先开启归档模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
--启用归档
SQL> archive log list
Database log mode              No Archive Mode
Automatic archival             Disabled
Archive destination            /oracle/products/10.2/db/dbs/arch
Oldest online log sequence     213
Current log sequence           215

--修改归档目录,更改前确认/arch_qd/logs 目录,oracle 账号有完全的读写权限。
SQL> show parameter name -- 查看实例名称,避免搞错实例
NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
db_file_name_convert                 string
db_name                              string
db_unique_name                       string
global_names                         boolean
instance_name                        string
lock_name_space                      string
log_file_name_convert                string
processor_group_name                 string
service_names                        string

SQL> show parameter dest
NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
log_archive_dest                     string
log_archive_dest_1                   string      LOCATION=/oracle/arch02/CRMDB2
-- 修改归档格式
SQL> show parameter log_archive_format;

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
log_archive_format                   string      %t_%s_%r.dbf
SQL>
SQL> alter system set log_archive_format = "archive_%t_%s_%r.arch" scope=spfile; -- 需要重启
--启用归档
--修改归档目录,更改前确认/arch_qd/logs 目录,oracle 账号有完全的读写权限。
SQL> alter system set log_archive_dest_1='LOCATION=/arch_qd/logs' sid='cdrdb'  SCOPE=BOTH;
-- 停止监听
lsnrctl stop
--然后关闭数据库,注意打开 alert*log 后台日志
SQL> shut immediate;

-- 如果长时间关闭不了,日志没有异常,可能是会话未关闭
-- 杀掉非本地进程
ps -ef|grep LOCAL=NO |awk '{print $2}' |xargs kill -15
-- 如果 -15 无法关闭,可以使用 -9 强制关闭
ps -ef|grep LOCAL=NO |awk '{print $2}' |xargs kill -9

SQL> startup mount;
SQL> alter database archivelog;
-- ALTER DATABASE NOARCHIVELOG; -- 切换到非归档模式
SQL> alter database open;

--查看更改后的归档,应该类似下面的输出
SQL> archive log list
Database log mode              Archive Mode
Automatic archival             Enabled
Archive destination            /arch_qd/logs
Oldest online log sequence     2967
Next log sequence to archive   2973
Current log sequence           2973

-- 切换归档日志
SQL> alter system switch logfile;

查看归档日志是否正常产生

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--ls 检查归档目录,是否产生归档日志
cd /arch2/logs
ls -rlt
ls -lt 按照时间排序,倒序
ls -lrt 按照时间排序,正序
--ls 单行显示文件,利用数字 1 参数项,这样一行只会显示一列文件的文件名,不会显示其他信息,方便编辑
例如:
CRM-DDB:/oracle/arch01/rmag$ls -t1 *dbf| head
2_1885370_909562248.dbf
1_1534588_909562248.dbf
1_1534587_909562248.dbf
2_1885369_909562248.dbf
2_1885368_909562248.dbf
1_1534586_909562248.dbf
2_1885367_909562248.dbf
1_1534585_909562248.dbf
2_1885366_909562248.dbf
1_1534584_909562248.dbf

以下是 RAC 开启归档,供参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1
修改 1 节点
chown oracle:oinstall /arch1
chmod 755 /arch1
chown oracle:oinstall logs
修改 2 节点
chown oracle:oinstall /arch2
chown oracle:oinstall logs

2
修改 1 节点参数
alter system set log_archive_dest_1="LOCATION=/arch1/logs" sid='ora11'
修改 2 节点参数
alter system set log_archive_dest_1="LOCATION=/arch2/logs" sid='ora12';

ALTER SYSTEM SET CLUSTER_DATABASE=FALSE SCOPE=spfile;

3 开启
shutdown immediate   --1 节点
shutdown immediate   --2 节点
startup mount        --1 节点
alter database archivelog;  --1 节点
alter database open; --1 节点
shutdown immediate   --1 节点
startup mount        --2 节点
alter database archivelog; --2 节点
alter database open; --2 节点
 ALTER SYSTEM SET CLUSTER_DATABASE=TRUE SCOPE=spfile;  --2 节点
shutdown immediate   --2 节点

开启附加日志

OGG 基于附加日志等进行实时传输,故需要打开相关日志确保可获取事务内容,通过下面的命令查看该状态

1
2
3
4
5
select force_logging,supplemental_log_data_min from v$database;
--若输出都为 NO,则需要通过命令修改,这里为了方便直接对数据库级别进行强制日志和增加附加日志,如果只需要对某个表增加附加日志,可以使用 alter table 命令,这样对数据库的影响较小
alter database force logging;
alter database add supplemental log data;

设置参数

如果要正常使用 OGG,需要设置参数 enable_goldengate_replication。这个参数在 11.2.0.4 和 12.1.0.2 以后才出现,用于启用 GoldenGate 复制功能。默认值为 FALSE,设置为 TRUE 后,数据库会启用 GoldenGate 复制功能,这样可以在数据库中创建 GoldenGate 复制配置。这个参数是动态参数,可以在数据库运行时设置。否则无法使用 OGG

该参数主要控制支持新数据类型和操作逻辑复制所需的补充日志记录。由于重做日志文件(redo log file)设计用于物理方式应用到数据库,因此其默认内容通常不包含足够信息将记录的变更转换为 SQL 语句。补充日志(supplemental logging)会在重做日志文件中添加额外信息,使得复制系统无需为每个变更访问数据库即可将日志变更转换为 SQL 语句。此前这些额外变更由补充日志 DDL 控制,现在必须设置该参数才能为任何新数据类型或操作启用所需的补充日志记录。ENABLE_GOLDENGATE_REPLICATION

支持逻辑复制所需的所有补充日志增强功能也由此参数控制。

该参数控制的 RDBMS 服务包括(但不限于):

  • GoldenGate Extract 使用的透明数据加密(Transparent Data Encryption,含表空间加密)工具
  • GoldenGate Extract 读取重做日志的服务
  • GoldenGate Replicat 抑制触发器触发的服务
  • GoldenGate Replicat 处理瞬时重复记录的服务
  • GoldenGate Replicat 绕过参照完整性检查的服务
  • 在集成抽取(Integrated Extract)和集成复制(Integrated Replicat)模式下运行 Oracle GoldenGate 所需的服务
1
2
3
4
5
6
7
--设置参数
SQL> alter system set enable_goldengate_replication=true scope=both;
-- 也可以仅对指定的 ogg 用户设置
exec dbms_goldengate_auth.grant_admin_privilege('ogguser','*',TRUE);
-- streams_pool 根据需要设置大小,如果不清楚效果可以不用设置
SQL> alter system set streams_pool_size=2048M scope=both;

配置 MGR

在源端配置 MGR 进程,用于管理抽取进程和传输进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 进入 OGG 管理界面
ggsci

-- 创建目录
GGSCI > CREATE SUBDIRS
-- 创建 MGR 进程
GGSCI > edit param mgr

-- ogg 配置文件使用两个减号 -- 表示注释
PORT 7809
DYNAMICPORTLIST 8000-8050
-- AUTOSTART extract
-- AUTORESTART extract,retries 4,waitminutes 4
STARTUPVALIDATIONDELAY 5
-- 限制 ip port 的访问,如果不需要可以注释掉
-- ACCESSRULE, PROG *, IPADDR 17X.1X.*, ALLOW
-- ACCESSRULE, PROG *, IPADDR *, ALLOW
-- ACCESSRULE, PROG SERVER, ALLOW
ACCESSRULE, PROG *, IPADDR *, ALLOW
PURGEOLDEXTRACTS /u01/ogg/ogg21/dirdat/*, USECHECKPOINTS,MINKEEPFILES 3

-- 启动
GGSCI > start mgr

配置全局检查点表

在 GLOBALS 参数文件中使用 CHECKPOINTTABLE 参数,可以指定一个默认的检查点表(checkpoint table)名称,该表可供一个或多个 Oracle GoldenGate 实例中的所有 Replicat 组使用。除非在创建 Replicat 组时使用 ADD REPLICAT 命令的 CHECKPOINTTABLE 选项覆盖此设置,否则所有通过 ADD REPLICAT 命令创建的 Replicat 组都将默认使用此表。

要创建检查点表,请在 GGSCI 中使用 ADD CHECKPOINTTABLE 命令。Oracle 支持并建议为集成复制(Integrated Replicat)创建检查点表。

1
2
3
4
5
6
GGSCI > edit param ./GLOBALS

ggschema ogguser
checkpointtable ogguser.ckpoint


添加同步的表

在源端添加需要同步的表,可以直接利用 sql 拼接

1
2
3
sqlplus / as sysdba
set linesize 300 pagesize 300
select 'add trandate '||owner||'.'||table_name from dba_tables where table_name in ('TABLE_NAME1','TABLE_NAME2');

在 GGSCI 中执行上面的 sql 查询输出的结果,添加需要同步的表

1
2
3
4
-- 需要注意的是,结尾没有分号
ggsci

GGSCI > add trandata needtable.TABLE_NAME1

配置抽取进程

在源端配置抽取进程,用于抽取源端的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- e_uat_ka 是抽取进程的名字,可以自定义。 TRANLOG 是抽取的方式,BEGIN NOW 是从当前时间开始抽取。 BEGIN NOW 可以替换为 SCN 或者 TIMESTAMP。
ggsci > add extract e_uat_ka,TRANLOG, BEGIN NOW
--
ggsci > ADD EXTTRAIL ./dirdat/ka,EXTRACT e_uat_ka, MEGABYTES 200

-- 配置抽取进程的参数
ggsci > edit param e_uat_ka

extract e_uat_ka
setenv (NLS_LANG = "american_america.UTF8")
setenv (ORACLE_HOME = "/erp/uat/db/tech_st/11.2.0")
setenv (ORCLE_SID = "orcl")
userid goldengate@orcl password "OGG_user_123"
discardfile ./dirrpt/e_uat_ka.dsc,purge,megabytes 1024
-- 如果源端和目标端的版本不一致,需要设置 FORMAT RELEASE 参数,如果一直可以不设置
-- exttrail ./dirdat/ka, FORMAT RELEASE 12.2
exttrail ./dirdat/ka
statoptions reportfetch
reportcount every 1 minutes,rate
warnlongtrans 1H,checkinterval 5m
TABLE WMS_F.ACTTION_DETAILS;
TABLE WMS_F.BASTION          ;

配置传输进程

在源端配置传输进程,用于传输抽取的数据到目标端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- p_uat_ka 是传输进程的名字,可以自定义。 EXTTRAILSOURCE 是抽取进程的 trail 文件路径
ADD EXTRACT p_uat_ka,EXTTRAILSOURCE ./dirdat/ka
ADD RMTTRAIL /ogg/ogg21/dirdat/ka,EXTRACT p_uat_ka, MEGABYTES 200

-- 配置传输进程的参数
edit param p_uat_ka

EXTRACT p_uat_ka
setenv (NLS_LANG = "american_america.UTF8")
setenv (ORACLE_HOME = "/erpt2/uat/db/tech_st/11.2.0")
setenv (ORCLE_SID = "U")
userid goldengate@orcl password "OGG_user_123"
RMTHOST 12.2.1.11,MGRPORT 7809
-- 如果源端和目标端的版本不一致,需要设置 FORMAT RELEASE 参数,如果一直可以不设置
-- RMTTRAIL /ogg/ogg21/dirdat/ka FORMAT RELEASE 12.2
RMTTRAIL /ogg/ogg21/dirdat/ka
discardfile  ./dirrpt/p_uat_ka.dsc,PURGE,megabytes 1024
TABLE WMS_F.ACTTION_DETAILS;
TABLE WMS_F.BASTION          ;

生成表结构文件

在源端生成表结构文件

1
2
3
4
5
6
7
8
9
10
11
12
13
vi ./dirprm/mapping_uat_ka.prm

defsfile ./dirdef/mapping_uat_ka.def,purge
userid goldengate@orcl password "OGG_user_123"
TABLE WMS_F.ACTTION_DETAILS;
TABLE WMS_F.BASTION          ;


 ./defgen paramfile ./dirprm/mapping_uat_ka.prm

 ## Definitions generated for 2 tables in ./dirdef/mapping_uat_ka.def.


将生成的表结构文件传输到目标端

1
scp ./dirdef/mapping_uat_ka.def [email protected]:/ogg/ogg21/dirdef/

目标端配置 OGG

配置 MGR

– 在目标端配置 MGR 进程,用于管理恢复进程

1
2
3
4
5
6
7
8
9
10
GGSCI > CREATE SUBDIRS

GGSCI > edit param mgr

PORT 7809
DYNAMICPORTLIST 8000-8050
STARTUPVALIDATIONDELAY 5
ACCESSRULE, PROG *, IPADDR *, ALLOW
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS,MINKEEPFILES 3

全局检查点表

1
2
3
####### 经过测试如果只是在
GGSCI > edit  param  ./GLOBALS
CHECKPOINTTABLE  okafka.checkpoint

配置复制进程

恢复至同一个 topic

在恢复之前不需要手动先创建 topic,ogg 在恢复数据的时候会自动创建 topic,但是默认只会有一个分区,如果需要多个分区就需要提前手动创建 topic。

如果恢复至一个 topic 的话,所有的数据都在一起。可以在验证阶段查看数据。

设置输出格式

Using the Pluggable Formatters (oracle.com)

在 ogg21.7 之后已经不支持insertOpKey | updateOpKey | deleteOpKey | truncateOpKey | includeTableName | includeOpTimestamp | includeOpType | includePosition | includeCurrentTimestamp, useIso8601Format,仅支持通过参数gg.handler.name.format.metaColumnsTemplate配置。

这里需要注意,和低版本的配置不同

1
2
3
4
5
6
7
As of Oggbd 21.7.x, we have the following metaColumnsTemplate

gg.handler.name.format.metaColumnsTemplate=${objectname[table]},$
{optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},$
{position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}

Would like to get the replicat name/ groupname also added
复制进程配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
edit param r_uat_ka

REPLICAT r_uat_ka
sourcedefs ./dirdef/mapping_uat_ka.def
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP auser.ftablename, TARGET okafka_all.ftablename;
MAP AR.hz_parties, TARGET okafka_all.hz_parties;


### 关于 gg.handler.kafkahandler.format.metaColumnsTemplate 高版本和低版本的写法不一样,如果 flink 的版本较低的话,只能识别旧的写法,op_ts,op_type 之类的简写,从 21c 开始同时支持单次全拼或简写,且

vim dirprm/kafka.props

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=okafka_all
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.metaColumnsTemplate=${optype[op_type]},${objectname[table]},${timestamp[op_ts]},${currenttimestampmicro[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]}
goldengate.userexit.timestamp=utc+8
goldengate.userexit.writers=javawriter
javawriter.stats.display=true
javawriter.stats.full=true
gg.classpath=dirprm/:/usr/bigtop/3.2.0/usr/lib/kafka/libs/*:/ogg/ogg21/:/ogg/ogg21/lib/*

#####
vim custom_kafka_producer.properties

bootstrap.servers=bigdata01:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

####
add replicat r_uat_ka exttrail ./dirdat/ka,checkpointtable okafka.checkpoint

恢复至多个 topic

将每张表的数据恢复至单独的一个 topic,这样可以更好的管理数据,不过配置步骤多一些

auser.ftablename
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
## auser.ftablename
edit param r_ka01

REPLICAT r_ka01
sourcedefs ./dirdef/mapping_uat_ka.def
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka01.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP auser.ftablename, TARGET applsys_ftablename.ftablename;



###
vim dirprm/kafka01.props

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=applsys_ftablename ## 这里需要替换为实际的 topic 名称
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.metaColumnsTemplate=${optype[op_type]},${objectname[table]},${timestamp[op_ts]},${currenttimestampmicro[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]}
goldengate.userexit.timestamp=utc+8
goldengate.userexit.writers=javawriter
javawriter.stats.display=true
javawriter.stats.full=true
gg.classpath=dirprm/:/usr/bigtop/3.2.0/usr/lib/kafka/libs/*:/ogg/ogg21/:/ogg/ogg21/lib/*

#####
vim custom_kafka_producer.properties

bootstrap.servers=bigdata01:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

####
add replicat r_ka01 exttrail ./dirdat/ka,checkpointtable okafka.checkpoint

AR.hz_parties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
## AR.hz_parties
edit param r_ka02

REPLICAT r_ka02
sourcedefs ./dirdef/mapping_uat_ka.def
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka02.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP AR.hz_parties, TARGET ar_hz_parties.hz_parties;


###
vim dirprm/kafka02.props

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=ar_hz_parties
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.metaColumnsTemplate=${optype[op_type]},${objectname[table]},${timestamp[op_ts]},${currenttimestampmicro[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]}
goldengate.userexit.timestamp=utc+8
goldengate.userexit.writers=javawriter
javawriter.stats.display=true
javawriter.stats.full=true
gg.classpath=dirprm/:/usr/bigtop/3.2.0/usr/lib/kafka/libs/*:/ogg/ogg21/:/ogg/ogg21/lib/*

#####
vim custom_kafka_producer.properties

bootstrap.servers=bigdata01:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

####
add replicat r_ka02 exttrail ./dirdat/ka,checkpointtable okafka.checkpoint

全量恢复

启动对应的进程

在恢复之前需要先启动对应的进程

1
2
3
4
5
## 源端和目标端先开启 mgr
start mgr
## 源端开启抽取和传输进程
start E_UAT_KA
start P_UAT_KA

源端抽取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
## 全量 在源端的 OGG 中抽取数据
ggsci> edit params ei_ka
SOURCEISTABLE
setenv (NLS_LANG = "american_america.UTF8")
setenv (ORACLE_HOME = "/erttdb02/uat/db/tech_st/11.2.0")
setenv (ORCLE_SID = "orcl")
userid goldengate@ password "OGG_user_123"
RMTHOST 1.1.1.1,MGRPORT 7809
RMTFILE /ogg/ogg21/dirdat/ka,maxfiles 100,megabytes 1024,purge
TABLE AYS.fup_values;
TABLE A.hzties;

#### 启动并查看抽取进程正常
shell> nohup ./extract paramfile ./dirprm/ei_ka.prm reportfile ./dirrpt/ei_ka.rpt &

目标端恢复数据

恢复至同一个 topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
shell> ggsci

ggsci> edit params ri_uat_ka
SPECIALRUN
END RUNTIME
targetdb  LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
EXTFILE ./dirdat/ka
DISCARDFILE ./dirrpt/ri_uat_ka.dsc,purge
GROUPTRANSOPS 10000
MAP AP.fnd_values, TARGET okafka_all.fnkup_values;
MAP AR.hpties, TARGET okafka_all.hzties;

#### 启动并查看回放进程是否正常
shell> nohup ./replicat paramfile ./dirprm/ri_uat_ka.prm reportfile ./dirrpt/ri_uat_ka.rpt &


分开恢复

如果需要将每张表的数据恢复至单独的一个 topic,需要单独配置恢复进程。例如按照不同的 schema 恢复数据,或者不同的表恢复数据

APS.fnd_lvalue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
shell> ggsci

ggsci> edit params ri_uat_ka01
SPECIALRUN
END RUNTIME
targetdb  LIBFILE libggjava.so SET property=dirprm/kafka01.props
REPORTCOUNT EVERY 1 MINUTES, RATE
EXTFILE ./dirdat/ka
DISCARDFILE ./dirrpt/ri_uat_ka01.dsc,purge
GROUPTRANSOPS 10000
MAP APS.fnd_values, TARGET apps_fndup_values.f_loup_vaes;

#### 启动并查看回放进程是否正常
shell> nohup ./replicat paramfile ./dirprm/ri_uat_ka01.prm reportfile ./dirrpt/ri_uat_ka01.rpt &


AR.hz_rties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
shell> ggsci

ggsci> edit params ri_uat_ka02
SPECIALRUN
END RUNTIME
targetdb  LIBFILE libggjava.so SET property=dirprm/kafka02.props
REPORTCOUNT EVERY 1 MINUTES, RATE
EXTFILE ./dirdat/ka
DISCARDFILE ./dirrpt/ri_uat_ka02.dsc,purge
GROUPTRANSOPS 10000
MAP AR.hz_parties, TARGET ar_hz_parties.hz_parties;

#### 启动并查看回放进程是否正常
shell> nohup ./replicat paramfile ./dirprm/ri_uat_ka02.prm reportfile ./dirrpt/ri_uat_ka02.rpt &


数据验证

如果多张表的数据都在同一个 topic,查看的时候只需要指定 topic 就行,数据是按照更新的时间排序的。

查看内容

在命令行查看 topic 的内容时,默认会持续打印数据,只能手动结束(crtl+c),可以将数据重定向至一个文件方便查看。

1
2
3
4
5
6
## 查看所有 topic
kafka-topics.sh --list --zookeeper localhost:2181
## 查看数据
shell1> kafka-console-consumer.sh --bootstrap-server bigdata01:9092  --topic okafka_all --from-beginning > /tmp/kafka_okafka_all.txt

shell2> [root@bigdata01 ogg21]# tail -2 /tmp/kafka_okafka_all.txt

格式

在第一次全量恢复数据或者复制进程恢复数据的时候并没有 before 属性,只有在更新/删除操作发生之后才会有 before 属性信息。

低版本默认格式

flink 格式仅支持低版本的简写:[Ogg Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/ogg/)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "op_type": "U",
  "op_ts": "2020-05-13 15:40:06.000000",
  "current_ts": "2020-05-13 15:40:07.000000",
  "primary_keys": [
    "id"
  ],
  "pos": "00000000000000000000143",
  "table": "PRODUCTS"
}

高版本格式

使用单词全拼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
gg.handler.kafkahandler.format.metaColumnsTemplate=${optype},${objectname},${timestamp},${currenttimestampmicro},${position},${primarykeycolumns}

{
    "after": {
        "CONTENT": "ttt",
        "ID": 5
    },
    "before": {
        "CONTENT": "tessst",
        "ID": 5
    },
    "currenttimestampmicro": 1694662000110000,
    "objectname": "wxj.TEST_OGG",
    "optype": "U",
    "position": "00000000000000003225",
    "primarykeycolumns": [
        "ID"
    ],
    "timestamp": "2023-09-14 11:26:34.369566"
}

兼容低版本简写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
gg.handler.kafkahandler.format.metaColumnsTemplate=${optype[op_type]},${objectname[table]},${timestamp[op_ts]},${currenttimestampmicro[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]}

{
    "after": {
        "CONTENT": "ttt",
        "ID": 6
    },
    "before": {
        "CONTENT": "tessst",
        "ID": 6
    },
    "current_ts": 1694670588735000,
    "op_ts": "2023-09-14 13:49:43.433581",
    "op_type": "U",
    "pos": "00000000000000003539",
    "primary_keys": [
        "ID"
    ],
    "table": "wxj.TEST_OGG"
}
本文由作者按照 CC BY 4.0 进行授权