在 Logstash 中配置从 MySQL 到 Kafka 的数据传输是一个非常经典且强大的用例,常用于数据同步、CDC(变更数据捕获)和实时数据管道。
下面我将详细解析配置文件的每个部分,并提供多个场景的示例。
核心架构与组件
数据流:MySQL → Logstash ( →
jdbc input →
filter) → Kafka
kafka output
为了实现高效的增量同步,其核心工作机制如下所示:

基础配置文件详解 (
mysql-to-kafka.conf)
mysql-to-kafka.conf
input {
jdbc {
# 【必需】JDBC 连接字符串
jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
# 【必需】数据库用户名和密码
jdbc_user => "your_username"
jdbc_password => "your_password"
# 【必需】MySQL JDBC 驱动路径
# 需要手动下载 https://dev.mysql.com/downloads/connector/j/
jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 注意类名
# 【必需】要执行的 SQL 语句
# 1. 使用增量字段(如update_time, id)进行增量查询
# 2. :sql_last_value 是Logstash提供的变量,记录上一次执行的值
statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"
# 【强烈建议】定时执行,使用cron表达式。例如每分钟一次。
schedule => "* * * * *"
# 【强烈建议】记录上次查询结果的字段值(如最大的update_time或id)
# 此文件由Logstash管理,用于下次查询的:sql_last_value
record_last_run => true
last_run_metadata_path => "/path/to/.logstash_jdbc_last_run"
# 【可选】是否强制将JDBC列的字符串转换为UTF-8
jdbc_default_timezone => "UTC"
jdbc_force_standard_timezone => true
# 【可选】分页查询,用于处理大表
jdbc_paging_enabled => true
jdbc_page_size => 100000
}
}
filter {
# 此处是进行数据清洗和转换的地方,根据需求添加。
# 例如:
# 1. 删除不必要的字段
mutate {
remove_field => ["@version", "@timestamp"]
}
# 2. 如果需要,可以将记录转换为JSON字符串(如果Kafka希望接收字符串消息)
# json {
# source => "message"
# target => "value"
# }
}
output {
kafka {
# 【必需】Kafka集群的broker列表
bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"
# 【必需】目标Topic的名称
topic_id => "mysql-your_table-topic"
# 【必需】指定消息的序列化格式。通常使用JSON。
codec => json
# 【可选】消息的Key。非常重要,用于决定Kafka分区。
# 使用表的主键可以保证同一主键的记录总是进入同一分区,保证顺序性。
message_key => "%{id}" # 假设表有一个'id'字段
# 【可选】压缩算法,提升传输效率
compression_type => "snappy"
# 【可选】生产者ACK机制
acks => "1" # "all" for highest durability
# 【可选】SSL/SASL认证(如果Kafka集群需要)
# ssl_truststore_location => "/path/to/client.truststore.jks"
# ssl_truststore_password => "password"
# sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';"
# sasl_mechanism => "PLAIN"
# security_protocol => "SASL_SSL"
}
# 强烈建议添加stdout输出用于调试,生产环境中可注释掉
stdout {
codec => rubydebug
}
}
不同场景的配置示例
场景 1:基于时间戳的增量同步(最常用)
适用于有 或
update_time 字段的表,能捕获插入和更新操作。
create_time
input {
jdbc {
...
statement => "SELECT * FROM orders WHERE update_time > :sql_last_value ORDER BY update_time ASC"
use_column_value => true
tracking_column => "update_time" # 指定跟踪的列,类型必须是timestamp或number
tracking_column_type => "timestamp" # 类型是timestamp
...
}
}
场景 2:基于自增ID的增量同步
适用于只追加(append-only)的表,只能捕获新插入的记录。
input {
jdbc {
...
statement => "SELECT * FROM log_entries WHERE id > :sql_last_value ORDER BY id ASC"
use_column_value => true
tracking_column => "id" # 指定跟踪的列
tracking_column_type => "numeric" # 类型是数字
...
}
}
场景 3:全量同步
通常只在初始化时使用。
input {
jdbc {
...
statement => "SELECT * FROM products" # 没有WHERE条件
schedule => "*/5 * * * *" # 每5分钟一次(慎用!会导致大量重复数据)
# 注意:如果进行全量同步,通常需要设置message_key并使用Kafka的日志压缩功能来去重。
...
}
}
运行与调试
下载 JDBC 驱动:从 MySQL 官网 下载 Platform Independent 版本的驱动包(如 ),解压后找到其中的
mysql-connector-java-8.0.33.tar.gz 文件,将其路径填入
.jar。
jdbc_driver_library
测试配置文件:
/path/to/logstash/bin/logstash -f /path/to/your/mysql-to-kafka.conf --config.test_and_exit
启动 Logstash:
/path/to/logstash/bin/logstash -f /path/to/your/mysql-to-kafka.conf
查看输出:启动后,控制台()会打印出获取到的数据,方便你调试。确认数据正确后,可以注释掉
stdout 输出。
stdout
重要注意事项与最佳实践
性能与资源:
:对于大数据表,务必启用分页 (
jdbc_page_size) 并设置合理的页面大小,避免一次性加载过多数据导致内存溢出(OOM)。调度间隔:根据数据更新频率设置
jdbc_paging_enabled => true。太频繁会增加数据库压力,太慢会导致数据延迟。 数据顺序与可靠性:
schedule
:务必设置。使用表的主键,这对于 Kafka 主题的日志压缩(Log Compaction)和保证同一键值的消息顺序至关重要。
message_key:对数据可靠性要求高的场景,设置为
acks。 时区问题:
"all"
建议将数据库、Logstash 和 Kafka 的时区都统一设置为 UTC,避免出现时区转换错误。 高级CDC:
上述配置是基于查询的简单CDC,有延迟且可能遗漏高频更新。对于要求实时、精确的CDC,应该使用更高级的工具,如:
Debezium:直接读取数据库的 binlog,是更好的选择。Canal:阿里巴巴的开源项目,同样基于 binlog。
Logstash JDBC -> Kafka 管道是实现数据库到 Kafka 同步的一个强大而灵活的方法,非常适合中等延迟要求的用例。配置的核心在于正确编写增量查询 SQL 并管理好 。
:sql_last_value
关于sql_last_value设置要点
关于sql_last_value设置要点
对于初次使用 Logstash 从 MySQL 同步数据到 Kafka,设置 是关键的第一步。根据你的目标,有不同的初始值设置方法。
sql_last_value
核心决策:你要全量同步还是增量同步?
| 同步类型 | 目标 | 初始值设置方法 |
|---|---|---|
| 全量同步 | 将表中所有历史数据全部导入 Kafka | 不干预,使用默认值 |
| 增量同步 | 只从当前时刻开始同步后续的新数据 | 手动创建状态文件 |
方案一:全量同步(导入全部历史数据)
这是最简单的方式,让 Logstash 自己管理即可。
配置文件示例:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database"
jdbc_user => "your_username"
jdbc_password => "your_password"
jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 核心SQL语句:使用 :sql_last_value 变量
statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"
# 定时调度(例如每分钟一次)
schedule => "* * * * *"
# 启用增量追踪
use_column_value => true
tracking_column => "update_time" # 指定追踪的字段
tracking_column_type => "timestamp" # 字段类型
# 状态文件路径(重要!Logstash用它记录最后一次的值)
last_run_metadata_path => "/path/to/.logstash_jdbc_last_run_your_table"
}
}
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "your_topic_name"
codec => json
}
}
首次运行会发生什么:
Logstash 启动。
检查 文件,发现文件不存在。
/path/to/.logstash_jdbc_last_run_your_table
Logstash 自动将 设置为默认初始值:
:sql_last_value
对于数字类型(如自增ID),默认是 。对于时间类型(如
0),默认是 Unix 纪元时间(1970-01-01 00:00:00 UTC)。
update_time
执行的 SQL 语句变为:
SELECT * FROM your_table WHERE update_time > '1970-01-01 00:00:00'
这条语句会匹配表中的所有记录,从而实现全量同步。
同步完成后,Logstash 会将本次查询到的最大 值写入状态文件,后续的执行就基于这个值进行增量同步。
update_time
方案二:增量同步(仅同步未来数据)
如果你不想同步巨大量的历史数据,只想从现在开始同步,你需要手动设置初始值。
操作步骤:
获取当前时间。在 MySQL 中执行:
SELECT NOW();
假设返回 。
2023-10-27 10:00:00
手动创建状态文件:
# 创建状态文件,并写入当前时间作为初始值
echo "2023-10-27 10:00:00" > /path/to/.logstash_jdbc_last_run_your_table
注意:时间格式必须与数据库中的格式一致。
然后启动 Logstash。
首次运行会发生什么:
Logstash 启动。
检查状态文件,发现文件存在且内容为 。
2023-10-27 10:00:00
Logstash 将 设置为你提供的这个时间。
:sql_last_value
执行的 SQL 语句变为:
SELECT * FROM your_table WHERE update_time > '2023-10-27 10:00:00'
这条语句会匹配所有在这个时间点之后新增或修改的记录。之前的历史数据不会被同步。
基于自增ID的增量同步配置
如果表没有时间戳,但有自增ID(AUTO_INCREMENT ID),也可以用它来追踪。
input {
jdbc {
# ... 其他连接参数同上
statement => "SELECT * FROM your_table WHERE id > :sql_last_value"
schedule => "* * * * *"
use_column_value => true
tracking_column => "id" # 追踪字段改为id
tracking_column_type => "numeric" # 类型改为数字
last_run_metadata_path => "/path/to/.logstash_jdbc_last_run_your_table_id"
}
}
全量同步:不手动创建状态文件,Logstash 默认从 开始,查询所有
0 的记录。增量同步:手动创建状态文件,并写入一个初始ID(如
id > 0),则后续只同步
1000 的记录。
id > 1000
重要注意事项和最佳实践
文件权限:确保运行 Logstash 的用户有权限读写 指定的文件。
last_run_metadata_path
不要重复使用状态文件:每个独立的 JDBC input 配置必须使用不同的 ,否则它们会互相覆盖,导致同步混乱。
last_run_metadata_path
测试:首次运行前,先用 测试配置文件语法:
--config.test_and_exit
/path/to/logstash/bin/logstash -f your_config.conf --config.test_and_exit
调试:首次运行时,保留 输出,在控制台查看获取到的数据,确认SQL查询和值是否正确。
stdout { codec => rubydebug }
时区问题:建议将数据库、Logstash 和状态文件中的时间都统一为 UTC 时区,避免时区转换带来的复杂性和错误。
总结:对于初次使用,如果需要全量数据,什么都不用做,直接运行 Logstash 即可。如果你只需要从今往后的新数据,那么就在运行前,手动创建状态文件并写入当前时间。


















暂无评论内容