OGG-添加新的表
OGG-添加新的表
前言
假设已经存在一个 OGG 的同步链路,后续需要添加新的表,如果重新搭建新的同步链路步骤也较多,在不影响效率的情况下,可以在原有的 OGG 链路上添加新的表
本次主要是通过 OGG 将 Oracle 19c 的数据同步到 Kafka 因为链路已经存在,所以可以省略安装配置 OGG 的步骤,直接在原有的链路上添加新的表
假设需要增加三张表:ACT_TRAG、DOCER、INVTT,用户为 AAUSER
文中的表名和用户、主机仅供参考,批量替换过
环境
源端
Oracle :19.3.0
OS:CentOS 7.7
OGG:Oracle GoldenGate Command Interpreter for Oracle
1
Version 21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047_FBO
目标端
Kafka:2.4.1
1
kafka 集群 broker 地址:1.1.1.101:9092,1.1.1.102:9092,1.1.1.103:9092
OS:CentOS 7.7
OGG:Oracle GoldenGate for Big Data Version 21.4.0.0.0 (Build 002)
1
2
Oracle GoldenGate Command Interpreter
Version 21.4.0.0.0 OGGCORE_21.4.0.0.0OGGRU_PLATFORMS_211022.1803
源端 OGG
添加新的表
1
2
3
4
5
6
7
8
9
10
11
ggsci
## 先登陆到源端数据库
> DBLOGIN USERID ogguser@ORCL password "OGG_user_123";
## GGSCI (localhost.localdomain) 1> ADD TRANDATA <SCHEMA>.<TABLE>
## 添加新的表
add trandata AAUSER.ACT_TRAG
add trandata AAUSER.DOCER
add trandata AAUSER.INVTT
抽取和传输进程添加新的表
在现有的抽取和传输进程配置文件中添加新的表,然后重启进程,时间很短,如果现有的链路目标端对实时性要求较高,建议配置新的链路或者在业务低峰期操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> info all
## 配置抽取进程
> edit param E_UAT_KA
TABLE AAUSER.ACT_TRAG;
TABLE AAUSER.DOCER ;
TABLE AAUSER.INVTT ;
> edit param p_uat_ka
TABLE AAUSER.ACT_TRAG;
TABLE AAUSER.DOCER ;
TABLE AAUSER.INVTT ;
> stop E_UAT_KA
> stop p_uat_ka
> info all
> start E_UAT_KA
> start p_uat_ka
> info all
重新生成表结构文件
在源端生成表结构文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vi ./dirprm/mapping_uat_ka.prm
defsfile ./dirdef/mapping_uat_ka.def,purge
userid ogguser@orcl password "OGG_user_123"
-- 包含之前的表和新添加的表
TABLE WMS_F.ACTTION_DETAILS;
TABLE WMS_F.BASTION ;
TABLE AAUSER.ACT_TRAG;
TABLE AAUSER.DOCER ;
TABLE AAUSER.INVTT ;
./defgen paramfile ./dirprm/mapping_uat_ka.prm
将生成的表结构文件传输到目标端
1
scp ./dirdef/mapping_uat_ka.def [email protected]:/ogg/ogg21/dirdef/
目标端配置 OGG
本次是需要将数据恢复到单个 topic,所以需要在目标端配置多个 topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
## 如果表的数量太多,可以直接进入 dirprm 目录下,复制一个模板,然后使用 vim 批量替换
edit param r_ka09
REPLICAT r_ka09 ## 需要修改为实际的 replicat 名称
sourcedefs ./dirdef/mapping_uat_ka.def ## 源端的表结构文件
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka01.props ## 配置 kafka 的属性文件,根据实际情况修改
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP auser.ftablename, TARGET applsys_ftablename.ftablename; ## 映射关系,根据实际情况修改
###
vim dirprm/kafka09.props
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=applsys_ftablename ## 这里需要替换为实际的 topic 名称
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.metaColumnsTemplate=${optype[op_type]},${objectname[table]},${timestamp[op_ts]},${currenttimestampmicro[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]}
goldengate.userexit.timestamp=utc+8
goldengate.userexit.writers=javawriter
javawriter.stats.display=true
javawriter.stats.full=true
gg.classpath=dirprm/:/usr/bigtop/3.2.0/usr/lib/kafka/libs/*:/ogg/ogg21/:/ogg/ogg21/lib/*
#####
vim custom_kafka_producer.properties
bootstrap.servers=bigdata01:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
####
ggsci> add replicat r_ka01 exttrail ./dirdat/ka,checkpointtable okafka.checkpoint
ggsci> info all
源端抽取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
## 全量 在源端的 OGG 中抽取数据,抽取数据的时候可以抽取所有的表,也可以只抽取新增的表
ggsci> edit params ei_ka
SOURCEISTABLE
setenv (NLS_LANG = "american_america.UTF8")
setenv (ORACLE_HOME = "/erttdb02/uat/db/tech_st/11.2.0")
setenv (ORCLE_SID = "orcl")
userid ogguser@orcl password "OGG_user_123"
RMTHOST 1.1.1.1,MGRPORT 7809
RMTFILE /ogg/ogg21/dirdat/ka,maxfiles 100,megabytes 1024,purge
TABLE AAUSER.ACT_TRAG;
TABLE AAUSER.DOCER ;
TABLE AAUSER.INVTT ;
####
shell> nohup ./extract paramfile ./dirprm/ei_ka.prm reportfile ./dirrpt/ei_ka.rpt &
目标端恢复数据
将每张表的数据恢复至单独的一个 topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
shell> ggsci
ggsci> edit params ri_uat_ka09 ## 需要修改为实际的 replicat 名称
SPECIALRUN
END RUNTIME
targetdb LIBFILE libggjava.so SET property=dirprm/kafka09.props ## 配置 kafka 的属性文件,根据实际情况修改
REPORTCOUNT EVERY 1 MINUTES, RATE
EXTFILE ./dirdat/ka
DISCARDFILE ./dirrpt/ri_uat_ka09.dsc,purge ## 根据实际情况修改
GROUPTRANSOPS 10000
MAP APS.fnd_values, TARGET apps_fndup_values.f_loup_vaes;
#### 启动并查看回放进程是否正常
shell> nohup ./replicat paramfile ./dirprm/ri_uat_ka09.prm reportfile ./dirrpt/ri_uat_ka09.rpt &
查看数据
查看内容 在命令行查看 topic 的内容时,默认会持续打印数据,只能手动结束(crtl+c),可以将数据重定向至一个文件方便查看。
1
2
3
4
5
6
7
8
9
## 查看所有 topic
kafka-topics.sh --list --zookeeper localhost:2181
## 查看数据
shell1> kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic afkall --from-beginning > /tmp/afka_all.txt
shell2> tail -2 /tmp/afkall.txt
## 统计 topic 的记录数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bigdata01:9092 --topic afkall --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
本文由作者按照
CC BY 4.0
进行授权