1. 首页
  2. >
  3. 编程技术
  4. >
  5. Java

使用Debezium、Postgres和Kafka进行数据实时采集(CDC)

1. 背景

一直在完善自己的微服务架构,其中包含分布式工作流服务的建设,目前采用的是Camunda工作流引擎。使用Camunda工作流,就会涉及到工作流引擎的用户体系如何与现有用户体系集成的问题(Flowable、Activity也类似)。现有设计中,工作流定位偏重于企业内部流程的流转,因此系统中设计了单位、部门、人员以及人事归属与Camunda工作流用户体系对应。

功能设计完成,就面临另外一个问题,如何解决现有人事体系数据如何【`实时`】同步至Camunda工作流引擎中。如果现有体系数据与工作流数据在同一个库中,相对比较好解决。而微服务架构中,不同服务的数据通常存放在不同数据库中,那么就需要进行数据的同步。采用的方式不同,可以取得的效果也相同。

最初考虑如下两种方案,但是都略感不足:

  • ETL:使用ETL工具进行数据同步是典型的方式,可以选择工具也比较多。开源的ETL工具增量同步问题解决的并不理想,不使用增量同步数那么数据同步始终存在时间差;商业的ETL工具增量同步解决的比较好,但是庞大且昂贵。
  • 消息队列:消息队列是多系统集成普遍采用的方式,可以很好地解决数据同步的实时问题。但是数据同步的两端都需要自己编写代码,一端写生产代码一端写消费代码,生产端代码还要捆绑现有体系数据所有操作,需要的编写量比较大。

查询对比的大量的资料,最终选择了Debezimu来解决以上问题以及未来更多数据同步的问题。

2. Debezium介绍

RedHat开源的Debezium是一个将多种数据源实时变更数据捕获,形成数据流输出的开源工具。

它是一种CDC(Change Data Capture)工具,工作原理类似大家所熟知的Canal, DataBus, Maxwell等,是通过抽取数据库日志来获取变更的。

官方介绍为:

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong

Debezium是一个分布式平台,它将您现有的数据库转换为事件流,因此应用程序可以看到数据库中的每一个行级更改并立即做出响应。Debezium构建在Apache Kafka之上,并提供Kafka连接兼容的连接器来监视特定的数据库管理系统。

Debezium现在已支持以下数据库:

  • MySQL
  • MongoDB
  • PostgreSQL
  • Oracle
  • SQL Server
  • Db2
  • Cassandra
  • Vitess

与ETL不同,Debezimu只支持在生产端连接数据库,消费端不支持连接数据库,而是需要自己编写代码接收Kafka消息数据。分析下来这种方式更加灵活,还可以很好利用现有微服务架构中的Kafka。

3. 快速搭建Debezimu测试环境。

目前,Debezium最新的Stable版本是1.6。 Debezium已经把要用到的Component打包成了Docker的Image,因此,我们只需要安装并启动Docker后就可以按下面的步骤快速搭建测试环境了。

3.1 运行Zookeeper

docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.6

3.2 运行Kafka

docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.6

3.3 运行PostgreSQL

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.6


上面代码中使用的是:
debezium/example-postgres:1.6,查看Debezimu官方文档以及其它示例都是这个。实际上Debezimu对PostgreSQL 9~13都进行了Docker封装,可以根据自己的需要在Docker Hub中选择相应的PostgreSQL版本。


debezium/postgres很小,使用也比较方便,而且也进行了必要的设置,无须再进行额外的配置就可以直接使用。

3.4 运行Debezimu Connect

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.6

Debezium的container启动时需要传入如下环境变量:

  • GROUP_ID: 分组ID,若需要启动多个Debezium的实例组成集群,那么它们的GROUP_ID必须被设置为一样
  • CONFIG_STORAGE_TOPIC:下面需要调用Debezium提供的RestFUL API管理connector,connector的信息就是保存在CONFIG_STORAGE_TOPIC指定的kafka topic下。
  • OFFSET_STORAGE_TOPIC: connector监控数据流的offset,若我们使用的是PostgreSQL Connector,那么OFFSET_STORAGE_TOPIC指定的topic中存的就是PostgreSQL的lsn。

3.5 创建Connector

经过上面4个步骤后,Debezium的测试环境就搭建好了,现在需要调用Debezium提供的API创建connector,使Debezium与数据库之间建立关系。我们把下面的payload POST到`http://<ip addr of debezium>:8083/connectors/`。

{   "name": "fulfillment-connector",     "config": {     "connector.class": "io.debezium.connector.postgresql.PostgresConnector",      "database.hostname": "192.168.99.100",      "database.port": "5432",      "database.user": "postgres",      "database.password": "postgres",      "database.dbname" : "postgres",      "database.server.name": "fulfillment",      "table.include.list": "public.inventory"   } }
  1. "name":注册到Kafka Connect服务的Connector名称
  2. "connector.class":PostgreSQL connector class名称
  3. "database.hostname":PostgreSQL 数据库地址
  4. "database.port":PostgreSQL 数据库端口
  5. "database.user":PostgreSQL 数据库用户名
  6. "database.password":PostgreSQL数据密码
  7. "database.dbname":连接的PostgreSQL数据库
  8. "database.server.name":虚拟的数据库Server名称,可以根据实际需求定义,消费Kafka数据时要使用该值
  9. "table.include.list":监听的数据表列表,以","分割。PostgreSQL要将表名写全,格式"<schema-name>.<table-name>"。如果没有特定的Schema,那么就是默认的`public`

下面为完成的curl命令:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "192.168.99.100", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres",    "database.server.name": "fulfillment", "table.include.list": "public.inventory" }}'

上面是示例,因为使用的是Windows,个人觉得curl不方便,换用postman:

使用Debezium、Postgres和Kafka进行数据实时采集(CDC)

3.6 Docker Compose 配置

为了方便使用,将以上Docker命令整合为Docker Compose配置,具体如下:

version: "3" services:   postgres:     image: debezium/postgres:13     container_name: postgres     hostname: postgres     environment:       POSTGRES_USER: herodotus       POSTGRES_PASSWORD: herodotus     ports:       - 5432:5432    zookeeper:     image: debezium/zookeeper:1.6     container_name: zookeeper     restart: always     ports:       - 2181:2181       - 2888:2888       - 3888:3888    kafka:     image: debezium/kafka:1.6     container_name: kafka     restart: always     ports:       - 9092:9092     environment:       ZOOKEEPER_CONNECT: zookeeper:2181       BOOTSTRAP_SERVERS: kafka:9092     depends_on:       - zookeeper    connect:     image: debezium/connect:1.6     container_name: connect     restart: always     ports:       - 8083:8083     environment:       GROUP_ID: 1       CONFIG_STORAGE_TOPIC: herodotus_connect_configs       OFFSET_STORAGE_TOPIC: herodotus_connect_offsets       STATUS_STORAGE_TOPIC: herodotus_connect_statuses       BOOTSTRAP_SERVERS: kafka:9092     depends_on:       - kafka

4. 外部数据库配置

上一章节,介绍了Debezimu测试环境的方式,其中使用的debezium/postgres是已经进行过配置的,所以使用起来比较方便。在实际使用过程中,很多时候是使用独立搭建PostgreSQL,那么就需要对PostgreSQL进行配置。

4.1 以Docker的方式运行基础组件

本章节主要介绍Debezimu与独立的PostgreSQL数据库连接,因此除了PostgreSQL以外,Zookeeper、Kafka、Debezimu Connect仍旧使用Docker方式部署。具体部署的Docker Compose配置如下:

version: "3" services:   zookeeper:     image: debezium/zookeeper:1.6     container_name: zookeeper     hostname: zookeeper     environment:       ZOOKEEPER_SERVER_ID: 1     ports:       - 2181:2181       - 2888:2888       - 3888:3888    kafka:     image: debezium/kafka:1.6     container_name: kafka     hostname: kafka     ports:       - 9092:9092     environment:       BROKER_ID: 1       ZOOKEEPER_CONNECT: zookeeper:2181       KAFKA_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://0.0.0.0:9092       KAFKA_ADVERTISED_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://192.168.101.10:9092       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INNER:PLAINTEXT,LISTENER_OUTER:PLAINTEXT       KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INNER       KAFKA_ALLOW_PLAINTEXT_LISTENER: 'yes'       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'     depends_on:       - zookeeper    connect:     image: debezium/connect:1.6     container_name: connect     hostname: connect     ports:       - 8083:8083     environment:       GROUP_ID: 1       CONFIG_STORAGE_TOPIC: herodotus_connect_configs       OFFSET_STORAGE_TOPIC: herodotus_connect_offsets       STATUS_STORAGE_TOPIC: herodotus_connect_statuses       BOOTSTRAP_SERVERS: kafka:9092     depends_on:       - kafka

其中Kafka Listener相关的配置,是为了解决Spring Kafka连接Kafka会出现:`Connection to node -1 could not be established. Broker may not be available.`问题。

4.2 修改PostgreSQL配置

Logical Decoding功能是PostgreSQL在9.4加入的,它是一种机制,允许提取提交到事务日志的更改,并在输出插件的帮助下以用户友好的方式处理这些更改。输出插件使客户机能够使用更改。

PostgreSQL connector 读取和处理数据库变化主要包含两个部分:

  • Logical Decoding 输出插件:根据选择可能需要安装输出插件。运行PostgreSQL服务之前,必须配置`replication slot `来启用你所选择的输出插件,有以下几个输出插件供选择:

decoderbufs: 是基于`Protobuf`的,目前由Debezimu社区维护

wal2json :是基于`JSON`的,目前由wal2json社区维护

pgoutput:在PostgreSQL 10及以上版本中是标准的Logical Decoding 输出插件。是由PostgreSQL社区维护,由PostgreSQL自己用于Logical Replication。这个插件是内置安装的,所以不需要额外安装。

  • Java代码(就是连接Kafka Connect的代码):负责读取由Logical Decoding 输出插件产生的数据。


Logical Decoding 输出插件不支持DDL变更,这意味着Connector不能把DDL变更事件发送给消费者


Logical Decoding Replicaiton Slots支持数据库的`primary`服务器。因此如果是PostgreSQL服务的集群,Connector只能在`primary`服务器激活。如果`primary`服务器出现问题,那么connector就会停掉。

4.2.1 修改PostgreSQL配置

在${PostgreSQL_HOME}/13/data目录下,找到postgresql.conf

修改以下配置:

wal_level=logical max_wal_senders=1 max_replication_slots=1
  • wal_level:通知数据库使用 logical decoding 读取预写日志
  • max_wal_senders: 通知数据库独立处理WAL变更的独立进程数量
  • max_replication_slots: 通知数据库处理WAL变更流所允许最大replication slots数目

配置完成后记得重启数据库

4.2.2 设置数据库权限

需要给PostgreSQL 用户分配replication权限。定义一个PostgreSQL role,至少分配REPLICATIONLOGION两项权限,示例代码如下:

CREATE ROLE <name> REPLICATION LOGIN;

具体操作可以参考以下脚本:

-- pg新建用户 CREATE USER user WITH PASSWORD 'pwd';  -- 给用户复制流权限 ALTER ROLE user replication;  -- 给用户登录数据库权限 grant CONNECT ON DATABASE test to user;  -- 把当前库public下所有表查询权限赋给用户 GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

4.3 创建Connector

把下面的payload POST到http://<ip addr of debezium>:8083/connectors/

{     "name": "herodotus-connector",     "config": {         "connector.class": "io.debezium.connector.postgresql.PostgresConnector",          "database.hostname": "192.168.101.10",          "database.port": "15432",          "database.user": "athena",          "database.password": "athena",          "database.dbname" : "athena",          "database.server.name": "herodotus",         "slot.name": "herodotus_slot",         "table.include.list": "public.sys_organization",         "publication.name": "herodotus_public_connector", 	    "publication.autocreate.mode": "filtered", 	    "plugin.name": "pgoutput"     } }

postman 界面操作如下图:

使用Debezium、Postgres和Kafka进行数据实时采集(CDC)


下面结合本例子中connector的配置信息对几个重点属性进行进一步说明:

Slot.name

按照上例Debezium会在PostgreSQL创建一个名为`herodotus_slot`的复制槽,本例中创建的connector需要通过该复制槽获取数据变更的信息。

可以通过以下sql查看复制槽的信息:

select * from pg_replication_slots;

使用Debezium、Postgres和Kafka进行数据实时采集(CDC)

上图中,active_pid为14200,即进程ID为14200的wal_sender进程已经在使用该复制槽与Debezium交互了

database.server.name和table.include.list

当connector获取到数据变更的信息后,会把该信息转化为统一的数据格式,并发布到Kafka的topic中。Debezium规定一个表对应一个topic,topic的名字的格式为 <database.server.name>.<schema name>.<table name>,本例中的表的数据变更消息将保存到Kafka的topic
herodotus.public.sys_organization中。

可以通过以下代码查看接收到的信息:

 @KafkaListener(topics = {"herodotus.public.sys_organization"}, groupId = "herodotus.debezium")  public void received(String message) {  				log.info("[Herodotus] |- Recived message from Debezium : [{}]", message);  }

5. 运行测试

现在,可以基于以上环境的配置,进行Debezium捕获数据效果的测试。可以进入到Kafka容器中,使用使用Kafka提供的kafka-console-consumer.sh查看Topic接收到的数据。具体命令如下:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.10:9092 --topic herodotus.public.sys_organization

5.1 Insert 测试

在数据库sys_organization 表中插入一条数据

Kafka的消费者命令行工具收到了来自Debezium发布的数据变更消息:

使用Debezium、Postgres和Kafka进行数据实时采集(CDC)

格式化后的消息体如下,其中schema字段在此先忽略,重点放payload.before,payload.after及payload.op字段上:

{ 	"schema": { 		... 	}, 	"payload": { 		"before": null, 		"after": { 			"organization_id": "4", 			"create_time": null, 			"ranking": null, 			"update_time": null, 			"description": null, 			"is_reserved": null, 			"reversion": null, 			"status": 1, 			"a4_biz_org_id": null, 			"biz_org_code": null, 			"biz_org_desc": null, 			"biz_org_id": null, 			"biz_org_name": null, 			"biz_org_type": null, 			"organization_name": "AAAAA", 			"parent_id": null, 			"partition_code": null, 			"short_name": null 		}, 		"source": { 			"version": "1.6.0.Final", 			"connector": "postgresql", 			"name": "herodotus", 			"ts_ms": 1626594964405, 			"snapshot": "false", 			"db": "athena", 			"sequence": "[\"63461608\",\"63461608\"]", 			"schema": "public", 			"table": "sys_organization", 			"txId": 2460, 			"lsn": 63461896, 			"xmin": null 		}, 		"op": "c", 		"ts_ms": 1626594964846, 		"transaction": null 	} }

由于是insert操作,所以op为c (create),before为null,after为我们插入的数据。

5.2 Update 测试

在数据库sys_organization表中修改一条数据

Kafka的消费者命令行工具收到了来自Debezium发布的数据变更消息:

使用Debezium、Postgres和Kafka进行数据实时采集(CDC)

格式化后的消息体如下:

{ 	"schema": { 		... 	}, 	"payload": { 		"before": null, 		"after": { 			"organization_id": "4", 			"create_time": null, 			"ranking": null, 			"update_time": null, 			"description": null, 			"is_reserved": null, 			"reversion": null, 			"status": 1, 			"a4_biz_org_id": null, 			"biz_org_code": null, 			"biz_org_desc": null, 			"biz_org_id": null, 			"biz_org_name": null, 			"biz_org_type": null, 			"organization_name": "BBBBB", 			"parent_id": null, 			"partition_code": null, 			"short_name": null 		}, 		"source": { 			"version": "1.6.0.Final", 			"connector": "postgresql", 			"name": "herodotus", 			"ts_ms": 1626595173601, 			"snapshot": "false", 			"db": "athena", 			"sequence": "[\"63466888\",\"63466888\"]", 			"schema": "public", 			"table": "sys_organization", 			"txId": 2461, 			"lsn": 63467176, 			"xmin": null 		}, 		"op": "u", 		"ts_ms": 1626595173825, 		"transaction": null 	} }

进行更新产品信息的操作后,consumer将收到一条op为u (update)的信息,after为修改后的数据。

5.3 Delete测试

在数据库sys_organization表中删除一条数据

Kafka的消费者命令行工具收到了来自Debezium发布的数据变更消息:

使用Debezium、Postgres和Kafka进行数据实时采集(CDC)

格式化后的消息体如下:

{ 	"schema": { 		... 	}, 	"payload": { 		"before": { 			"organization_id": "3", 			"create_time": null, 			"ranking": null, 			"update_time": null, 			"description": null, 			"is_reserved": null, 			"reversion": null, 			"status": null, 			"a4_biz_org_id": null, 			"biz_org_code": null, 			"biz_org_desc": null, 			"biz_org_id": null, 			"biz_org_name": null, 			"biz_org_type": null, 			"organization_name": null, 			"parent_id": null, 			"partition_code": null, 			"short_name": null 		}, 		"after": null, 		"source": { 			"version": "1.6.0.Final", 			"connector": "postgresql", 			"name": "herodotus", 			"ts_ms": 1626594566933, 			"snapshot": "false", 			"db": "athena", 			"sequence": "[\"63461120\",\"63461120\"]", 			"schema": "public", 			"table": "sys_organization", 			"txId": 2458, 			"lsn": 63461176, 			"xmin": null 		}, 		"op": "d", 		"ts_ms": 1626594567136, 		"transaction": null 	} }

进行删除产品信息的操作后,consumer将收到一条op为d (delete)的信息,before为删除前的数据,after为null。

6.总结

通过Debezimu进行数据同步,不仅解决了传统ETL时效性不高的问题,还解决了基于消息队列需要两端编写代码的工程量,而且基于容器的方式更适合微服务架构的使用,使用Kafka进行消费端的整合,使得整合方式更加灵活便捷、终端类型更加丰富。

示例代码地址:

  • [Gitee](https://gitee.com/herodotus/eurynome-cloud)
  • [Github](https://github.com/herodotus-cloud/eurynome-cloud)