【Flink 系列】Flink CDC 介绍与使用

目录

1. 什么是 “CDC”

2. Flink CDC 介绍

3. 一个简单的栗子

4. 与传统 CDC ETL 对比分析

5. 学习资料

1. 什么是 “CDC”

CDC 的全称是 Change Data Capture ,字面意思是“变化数据捕获”。广义上讲,是指各种能捕获数据变更的技术。一般谈到的 CDC 技术,主要指的是面向数据库的变更,监测并捕获数据库的变动(增删改等),将这些变更发送到下游消息中间件、数据库中。CDC 技术常应用在数据采集、数据分发、数据迁移等应用场景中。

1.2 CDC 实现机制

目前,业界主流的 CDC 实现机制可以分为两种:

① 基于查询的 CDC:

  • 离线调度查询作业,批处理。依赖表中的更新时间字段,执行查询获取表中最新的数据。
  • 无法捕获删除事件,无法保证数据一致性。
  • 无法保障实时性,基于离线调度天然存在延迟。

② 基于日志的 CDC:

  • 实时消费日志,流式处理。例如,MySQL 的 binlog 日志完整记录了数据库中的变更,把 binlog 文件当作流的数据源。
  • 保障了数据一致性,binlog 日志文件包含了所有历史变更明细。
  • 保障了实时性,binlog 日志文件可以“流式”消费,提供实时数据。

两种实现机制对比

基于查询实现CDC

基于日志实现CDC

典型产品

Sqoop/DataX/Kettle 等

Canal/Debezium/Flink CDC 等

执⾏模式

批处理

流处理

捕获所有数据变化

延迟性

高延迟

低延迟

数据库负载/业务入侵

入侵式

非入侵式

捕获删除事件

捕获旧记录的状态

基于日志的 CDC 实现起来可能会更复杂。但可以看到,具有数据捕获延迟低,能够提供准确的“变更”流,对源系统影响小等特点,超级适合实时场景。

1.2 CDC 常见方案对比

借鉴一张网上比较常见的 CDC 方案对比分析图:

【Flink 系列】Flink CDC 介绍与使用

  • 对于DataX 等查询型的 CDC 机制,实际上并不是完全不支持“增量同步”,条件是需要侵⼊业务(提供增量字段)才能做到增量同步的,⽽且是 T+1 的增量同步。
  • 在全量+增量一体化同步方面,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
  • 在架构方面,需要像 Flink CDC 等具有超级灵活的水平扩展能力的分布式架构。而 DataX 和 Canal 是个单机架构。
  • 在数据加工的能力上,Flink CDC 依托强劲的 Flink SQL 流式计算能力,可以超级方便地对数据进行加工。而 Debezium 等则需要通过复杂的 Java 代码才能完成,使用门槛比较高。
  • 在生态方面,对上下游存储的支持。Flink CDC 上下游超级丰富,支持对接 MySQL、PostgreSQL 等数据源,还支持写入到 TiDB、HBase、Kafka、Hudi 等各种存储系统中,也支持自定义 Connector。

2. Flink CDC 介绍

Flink CDC 的核心实现是基于 Flnk 的一组源连接器(flink-cdc-connectors),它可以从不同数据库接收“更改”。Flink CDC Connectors,集成了 Debezium 作为捕获数据更改的引擎。因此,它可以充分利用 Debezium 的能力。

什么是 Debezium?

Debezium 是一个开源项目,它将现有数据库中的信息转换为事件流,使应用程序能够检测数据库中的行级更改并立即做出响应。Debezium 构建在 Apache Kafka 之上,并提供了一组与 Kafka Connect 兼容的连接器。每个连接器都与特定的数据库管理系统(DBMS)一起工作。连接器通过检测发生的更改,并将每个更改事件的记录流式传输到 Kafka Topic,来记录 DBMS 中数据更改的历史。然后,使用应用程序可以从Kafka主题中读取生成的事件记录。

【Flink 系列】Flink CDC 介绍与使用

2.1 支持的 Connector

【Flink 系列】Flink CDC 介绍与使用

2.2 支持的 Flink 版本

【Flink 系列】Flink CDC 介绍与使用

2.3 如何使用 API

① 使用 Table/SQL API

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3),
 PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'flinkuser',
 'password' = 'flinkpw',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;

② 使用 DataStream API

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);

    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute("Print MySQL Snapshot + Binlog");
  }
}

3. 一个简单的栗子

假设:基于 Flink CDC,实现一个的简单 demo,读取 MySQL(订单明细表) 并写入 MySQL(按渠道汇总订单量),下面基于 Flink SQL 零编码实现,最后在 SQL Client 进行作业提交。

【Flink 系列】Flink CDC 介绍与使用

3.1 准备数据

① 创建订单明细表 sxg_order,并写入 6 条数据:

CREATE TABLE `sxg_order` (
  `order_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单ID',
  `amount` int(11) NOT NULL COMMENT '订单量',
  `channel_id` int(11) NOT NULL COMMENT '渠道ID',
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

【Flink 系列】Flink CDC 介绍与使用

② 创建订单汇总表按渠道(channel_id )统计订单量:

CREATE TABLE `sxg_order_sum` (
  `channel_id` int(11) NOT NULL COMMENT '渠道ID',
  `sum_amount` bigint(20) NOT NULL COMMENT '汇总订单量',
  `udpate_time` datetime NOT NULL COMMENT '更新时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8

3.2 编写 Flink SQL 脚本

① 使用 set 对一些参数进行配置。

② 使用 HiveCatalog 做存储元数据,也可以不使用。

③ 使用 CDC Connector 定义源表。

④ 使用 Table Connector 定义目标表。

⑤ 将按渠道会总后的数据写入订单汇总表,这里用了一个系统内置函数,获取当前时间。

-- 设置 job 名称
SET 'pipeline.name' = 'sxg_cdc_job';
-- 设置 checkpoint 
SET 'execution.checkpointing.interval' = '300s';


-- 使用 HiveCatalog 存储元信息
CREATE CATALOG hiveCatalog
  WITH (
    'type' = 'hive',
    'default-database' = 'tmp',
    'hive-conf-dir' = '/app/hive/conf'
  );

USE CATALOG hiveCatalog;

-- 使用 tmp 库
USE tmp;

-- 使用 CDC Connector 定义源表
DROP TABLE IF EXISTS sxg_cdc_order;

CREATE TABLE sxg_cdc_order (
                               order_id BIGINT,
                               amount BIGINT,
                               channel_id INT,
                               PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '10.112.182.11',
      'port' = '3306',
      'username' = 'root',
      'password' = 'root_use68pw9esd',
      'database-name' = 'test',
      'table-name' = 'sxg_order',
      'server-id' = '1'    
      );

-- 使用 Table Connector 定义目标表
DROP TABLE IF EXISTS sxg_cdc_order_sum;

CREATE TABLE sxg_cdc_order_sum(
                                  channel_id INT,
                                  sum_amount BIGINT,
                                  update_time TIMESTAMP(3),
                                  PRIMARY KEY (channel_id) NOT ENFORCED
) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://10.112.182.11:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false',
      'table-name' = 'sxg_order_sum',
      'username' = 'root',
      'password' = 'root_use68pw9esd'
      );


-- 将会总后的数据写入订单汇总表
INSERT INTO sxg_cdc_order_sum
SELECT channel_id,sum(amount),LOCALTIMESTAMP
FROM sxg_cdc_order
GROUP BY channel_id;

关于 Flink SQL、函数等的使用方法,回顾 【Flink 系列十二】Flink Table & SQL 实践原理(下)

3.3 导入所需依赖包

到 Flink CDC 项目网站,下载本例所需要(MySQL CDC Connector)的 jar 包,并导入到目录 ${flink.version}/lib/ 下:

【Flink 系列】Flink CDC 介绍与使用

下载地址:

https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/

3.4 执行 SQL 脚本,提交作业

第一启动 Flink SQL Client,将 3.2 部分的 SQL 保存为,cdc.sql 的文件,可以将程序依赖的 jar 包放到 lib 目录。

关于 SQL Client 的配置和使用参考:【Flink 系列十一】Flink Table & SQL 实践原理(中) 第 3 部分内容。

① 使用 -f 选项,执行上边编写的 SQL 脚本。

./bin/sql-client.sh embedded -s yarn-session -f cdc.sql -l ./lib

② 执行命令,提交作业,以下为提交打印的日志。

[root@hktest-client-01 flink-sql-client]# vim cdc.sql
[root@hktest-client-01 flink-sql-client]# /app/arch-pkg/flink-1.16.1/bin/sql-client.sh embedded -s yarn-session -f cdc.sql -l ./lib
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/app/arch-pkg/flink-1.16.1/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/app/arch-pkg/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2023-05-25 16:46:40,832 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-05-25 16:46:40,832 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
[INFO] Executing SQL from file.

Command history file path: /root/.flink-sql-history
Flink SQL> [INFO] Session property has been set.

Flink SQL> [INFO] Session property has been set.

Flink SQL>
>
> -- 使用 HiveCatalog 存储元信息
> CREATE CATALOG hiveCatalog
>   WITH (
>     'type' = 'hive',
>     'default-database' = 'tmp',
>     'hive-conf-dir' = '/app/hive/conf'
>   )[INFO] Execute statement succeed.

Flink SQL> [INFO] Execute statement succeed.

Flink SQL> [INFO] Execute statement succeed.

Flink SQL> [INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE sxg_cdc_order (
>                                order_id BIGINT,
>                                amount BIGINT,
>                                channel_id INT,
>                                PRIMARY KEY (order_id) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '10.112.182.11',
>       'port' = '3306',
>       'username' = 'data_api_user',
>       'password' = 'api_use68pw9esd',
>       'database-name' = 'test',
>       'table-name' = 'sxg_order'
>       )[INFO] Execute statement succeed.

Flink SQL> [INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE sxg_cdc_order_sum(
>                                   channel_id INT,
>                                   sum_amount BIGINT,
>                                   update_time AS PROCTIME(),
>                                   PRIMARY KEY (channel_id) NOT ENFORCED
> ) WITH (
>       'connector' = 'jdbc',
>       'url' = 'jdbc:mysql://10.112.182.11:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false',
>       'table-name' = 'sxg_order_sum',
>       'username' = 'data_api_user',
>       'password' = 'api_use68pw9esd'
>       )[INFO] Execute statement succeed.

Flink SQL>
>
> -- 将会总后的数据写入订单汇总表
> INSERT INTO sxg_cdc_order_sum
> SELECT channel_id,sum(amount)[INFO] Submitting SQL update statement to the cluster...
2023-05-25 16:46:47,297 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/app/arch-pkg/flink-1.16.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-05-25 16:46:47,602 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at hktest-admin5/10.112.185.18:10201
2023-05-25 16:46:47,614 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-05-25 16:46:47,616 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2023-05-25 16:46:47,661 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing over to rm2
2023-05-25 16:46:47,716 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hktest-admin1:8082 of application 'application_1656075154719_109037'.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 2817c919f3e32e743a25067c05de8c87


Flink SQL>
Shutting down the session...
done

③ 启动过程中可能遇到的问题(如果启动成功,忽略此步):

【问题1】MySQL CDC Connector 不支持 Mysql <=5.5 的版本:

Caused by: org.apache.flink.table.api.ValidationException: Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.6, but actual is 5.5.
at com.ververica.cdc.connectors.mysql.MySqlValidator.checkVersion(MySqlValidator.java:117) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:78) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315)

【Flink 系列】Flink CDC 介绍与使用

【解决】通过官方文档提供连接器,选择支持的 MySQL 版本。

【Flink 系列】Flink CDC 介绍与使用

【问题2】确认 MySQL 的 binlog 是否开启:

org.apache.flink.table.api.ValidationException: The MySQL server is configured with binlog_format STATEMENT rather than ROW, which is required for this connector to work properly. Change the MySQL configuration to use a binlog_format=ROW and restart the connector.
at com.ververica.cdc.connectors.mysql.MySqlValidator.checkBinlogFormat(MySqlValidator.java:134) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:79) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315) ~[flink-dist-1.16.1.jar:1.16.1]

【Flink 系列】Flink CDC 介绍与使用

【Flink 系列】Flink CDC 介绍与使用

【解决】编辑 MySQL 配置文件 my.cnf,在 [mysqld] 下添加两行,log_bin=mysql_bin 和 binlog_format=”ROW”。

【Flink 系列】Flink CDC 介绍与使用

保存后退出,重启数据库服务 service mysqld restart。再次查看:

【Flink 系列】Flink CDC 介绍与使用

【问题3】MySQL 用户权限的问题:

Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the SUPER, REPLICATION CLIENT privilege(s) for this operation
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1206) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:648) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]

【Flink 系列】Flink CDC 介绍与使用

【解决】使用 root 用户或者给普通用户赋予权限(super 或者 replication client)。这里,我直接切换到 root 用户(后一种方式没有尝试)。

【问题4】没有配置 server-id

Caused by: io.debezium.DebeziumException: Misconfigured master – server_id was not set Error code: 1236; SQLSTATE: HY000.
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1383) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1439) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
… 1 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Misconfigured master – server_id was not set
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0]
1 more

【Flink 系列】Flink CDC 介绍与使用

【解决】修改 MySQL 配置 my.cnf,增加 “server-id=1”。

3.6 查看运行状态及结果:

① 查看作业运行情况:

【Flink 系列】Flink CDC 介绍与使用

② 查看源表和目标 2 个表,在 Hive 中的元数据信息:

【Flink 系列】Flink CDC 介绍与使用

【Flink 系列】Flink CDC 介绍与使用

③ 查看 MySQL 中 sxg_order_sum 表的数据:

【Flink 系列】Flink CDC 介绍与使用

④ 对源订单表,再写入有一条数据,查看数据变化:

INSERT INTO test.sxg_order (amount,channel_id) VALUES (10,1);

⑤ 此时,订单汇总表变为:

【Flink 系列】Flink CDC 介绍与使用

4. 与传统 CDC ETL 对比分析

【Flink 系列】Flink CDC 介绍与使用

传统的基于 CDC 的 ETL 分析中,数据采集一般使用 Debezium、Canal 等工具,负责采集数据库的增量数据,一些采集工具也支持全量数据同步。采集到的数据一般输出到消息中间件如 Kafka,然后 Flink 计算引擎再去消费数据并写入到目的端,目的端可以是各种数据库、数据仓库、数据湖和消息队列等。

【Flink 系列】Flink CDC 介绍与使用

使用了 Flink CDC 之后,替换了传统 CDC 链路当中采集组件和消息队列,组件更少,维护更方便。同时更少的组件也意味着数据时效性能够进一步提高。

除此之外, Flink SQL 极大地降低了用户使用门槛,通过一个纯 SQL 作业,就可以完成数据清洗、分析和聚合。此外,利用 Flink SQL 双流 JOIN、维表 JOIN、UDTF 语法可以很容易地完成数据打宽及各种业务的逻辑加工。

为什么 Flink 做 CDC,从技术上看有着天然的匹配性?

前面文章介绍过 Flink 两个基础概念:Dynamic Table 和 Changelog Stream。

关于 Dynamic Table 回顾:【Flink 系列十】Flink Table & SQL 实践原理(上)第 2 部分。

【Flink 系列】Flink CDC 介绍与使用

如上图所示,Dynamic Table 和 Stream 是相对应的,并且可以相互转化。在 Flink SQL 中,数据在从一个算子流向另外一个算子时都是以 Changelog Stream 的形式,任意时刻的 Changelog Stream 可以生成一个表,也可以生成为一个流。

而数据库的 binlog 日志,也一直不断的追加数据库所有的变更,这种日志流就是将表的变更数据持续捕获的结果。这可以很自然的由 Flink SQL 的 Dynamic Table 来表明。

【Flink 系列】Flink CDC 介绍与使用

我们知道,Flink CDC 采用 Debezium 作为底层采集技术,它的数据结构与 Flink SQL 的内部数据结构 RowKind 超级类似,可以超级方便地对接起来。

每条 RowData 都有一个元数据 RowKind,包括 4 种类型, 分别是插入 (INSERT)、更新前镜像 (UPDATE_BEFORE)、更新后镜像 (UPDATE_AFTER)、删除 (DELETE),这四种类型和数据库里面的 binlog 概念保持一致。

【Flink 系列】Flink CDC 介绍与使用

而 Debezium 的数据结构,也有一个类似的元数据 op 字段, op 字段的取值也有四种,分别是 c、u、d、r,各自对应 create、update、delete、read。对于代表更新操作的 u,其数据部分同时包含了前镜像 (before) 和后镜像 (after)。

5. 学习资料

【1】项目文档:

https://ververica.github.io/flink-cdc-connectors/master

【2】项目 git 地址:

https://github.com/ververica/flink-cdc-connectors

【3】Flink CDC 新一代数据集成框架:

https://developer.aliyun.com/topic/download?spm=a2csy.flink.0.0.cf655badCs6svP&id=8256

【4】Debezium 官方文档:

https://debezium.io

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
染上彩虹色的小狗南安的头像 - 宋马
评论 共2条

请登录后发表评论