文章

OGG-添加新的表

OGG-添加新的表

前言

假设已经存在一个 OGG 的同步链路,后续需要添加新的表,如果重新搭建新的同步链路步骤也较多,在不影响效率的情况下,可以在原有的 OGG 链路上添加新的表

本次主要是通过 OGG 将 Oracle 19c 的数据同步到 Kafka 因为链路已经存在,所以可以省略安装配置 OGG 的步骤,直接在原有的链路上添加新的表

搭建可以参考之前的文章

假设需要增加三张表:ACT_TRAG、DOCER、INVTT,用户为 AAUSER

文中的表名和用户、主机仅供参考,批量替换过

环境

源端

Oracle :19.3.0

OS:CentOS 7.7

OGG:Oracle GoldenGate Command Interpreter for Oracle

1
    Version 21.3.0.0.0 OGGCORE_21.3.0.0.0_PLATFORMS_210728.1047_FBO

目标端

Kafka:2.4.1

1
kafka 集群 broker 地址:1.1.1.101:9092,1.1.1.102:9092,1.1.1.103:9092

OS:CentOS 7.7

OGG:Oracle GoldenGate for Big Data Version 21.4.0.0.0 (Build 002)

1
2
	Oracle GoldenGate Command Interpreter
		Version 21.4.0.0.0 OGGCORE_21.4.0.0.0OGGRU_PLATFORMS_211022.1803

源端 OGG

添加新的表

1
2
3
4
5
6
7
8
9
10
11
ggsci


## 先登陆到源端数据库
> DBLOGIN USERID ogguser@ORCL password "OGG_user_123";
## GGSCI (localhost.localdomain) 1> ADD TRANDATA <SCHEMA>.<TABLE>
## 添加新的表
add trandata AAUSER.ACT_TRAG
add trandata AAUSER.DOCER
add trandata AAUSER.INVTT

抽取和传输进程添加新的表

在现有的抽取和传输进程配置文件中添加新的表,然后重启进程,时间很短,如果现有的链路目标端对实时性要求较高,建议配置新的链路或者在业务低峰期操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
> info all

## 配置抽取进程
> edit param E_UAT_KA
TABLE AAUSER.ACT_TRAG;
TABLE AAUSER.DOCER   ;
TABLE AAUSER.INVTT        ;

> edit param p_uat_ka
TABLE AAUSER.ACT_TRAG;
TABLE AAUSER.DOCER   ;
TABLE AAUSER.INVTT        ;

> stop E_UAT_KA
> stop p_uat_ka
> info all
> start E_UAT_KA
> start p_uat_ka
> info all

重新生成表结构文件

在源端生成表结构文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vi ./dirprm/mapping_uat_ka.prm

defsfile ./dirdef/mapping_uat_ka.def,purge
userid ogguser@orcl password "OGG_user_123"
-- 包含之前的表和新添加的表
TABLE WMS_F.ACTTION_DETAILS;
TABLE WMS_F.BASTION          ;
TABLE AAUSER.ACT_TRAG;
TABLE AAUSER.DOCER   ;
TABLE AAUSER.INVTT        ;

 ./defgen paramfile ./dirprm/mapping_uat_ka.prm


将生成的表结构文件传输到目标端

1
scp ./dirdef/mapping_uat_ka.def [email protected]:/ogg/ogg21/dirdef/

目标端配置 OGG

本次是需要将数据恢复到单个 topic,所以需要在目标端配置多个 topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
## 如果表的数量太多,可以直接进入 dirprm 目录下,复制一个模板,然后使用 vim 批量替换

edit param r_ka09

REPLICAT r_ka09 ## 需要修改为实际的 replicat 名称
sourcedefs ./dirdef/mapping_uat_ka.def ## 源端的表结构文件
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka01.props ## 配置 kafka 的属性文件,根据实际情况修改
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP auser.ftablename, TARGET applsys_ftablename.ftablename; ## 映射关系,根据实际情况修改



###
vim dirprm/kafka09.props

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=applsys_ftablename ## 这里需要替换为实际的 topic 名称
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.metaColumnsTemplate=${optype[op_type]},${objectname[table]},${timestamp[op_ts]},${currenttimestampmicro[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]}
goldengate.userexit.timestamp=utc+8
goldengate.userexit.writers=javawriter
javawriter.stats.display=true
javawriter.stats.full=true
gg.classpath=dirprm/:/usr/bigtop/3.2.0/usr/lib/kafka/libs/*:/ogg/ogg21/:/ogg/ogg21/lib/*

#####
vim custom_kafka_producer.properties

bootstrap.servers=bigdata01:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

####
ggsci> add replicat r_ka01 exttrail ./dirdat/ka,checkpointtable okafka.checkpoint
ggsci> info all

源端抽取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
## 全量 在源端的 OGG 中抽取数据,抽取数据的时候可以抽取所有的表,也可以只抽取新增的表
ggsci> edit params ei_ka
SOURCEISTABLE
setenv (NLS_LANG = "american_america.UTF8")
setenv (ORACLE_HOME = "/erttdb02/uat/db/tech_st/11.2.0")
setenv (ORCLE_SID = "orcl")
userid ogguser@orcl password "OGG_user_123"
RMTHOST 1.1.1.1,MGRPORT 7809
RMTFILE /ogg/ogg21/dirdat/ka,maxfiles 100,megabytes 1024,purge
TABLE AAUSER.ACT_TRAG;
TABLE AAUSER.DOCER   ;
TABLE AAUSER.INVTT        ;

####
shell> nohup ./extract paramfile ./dirprm/ei_ka.prm reportfile ./dirrpt/ei_ka.rpt &

目标端恢复数据

将每张表的数据恢复至单独的一个 topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
shell> ggsci

ggsci> edit params ri_uat_ka09 ## 需要修改为实际的 replicat 名称
SPECIALRUN
END RUNTIME
targetdb  LIBFILE libggjava.so SET property=dirprm/kafka09.props ## 配置 kafka 的属性文件,根据实际情况修改
REPORTCOUNT EVERY 1 MINUTES, RATE
EXTFILE ./dirdat/ka
DISCARDFILE ./dirrpt/ri_uat_ka09.dsc,purge ## 根据实际情况修改
GROUPTRANSOPS 10000
MAP APS.fnd_values, TARGET apps_fndup_values.f_loup_vaes;

#### 启动并查看回放进程是否正常
shell> nohup ./replicat paramfile ./dirprm/ri_uat_ka09.prm reportfile ./dirrpt/ri_uat_ka09.rpt &

查看数据

查看内容 在命令行查看 topic 的内容时,默认会持续打印数据,只能手动结束(crtl+c),可以将数据重定向至一个文件方便查看。

1
2
3
4
5
6
7
8
9
## 查看所有 topic
kafka-topics.sh --list --zookeeper localhost:2181
## 查看数据
shell1> kafka-console-consumer.sh --bootstrap-server bigdata01:9092  --topic afkall --from-beginning > /tmp/afka_all.txt

shell2>  tail -2 /tmp/afkall.txt

## 统计 topic 的记录数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list bigdata01:9092 --topic afkall --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
本文由作者按照 CC BY 4.0 进行授权