Logstash数据迁移之mysql-to-kafka.conf详细配置

在 Logstash 中配置从 MySQL 到 Kafka 的数据传输是一个非常经典且强大的用例,常用于数据同步、CDC(变更数据捕获)和实时数据管道

下面我将详细解析配置文件的每个部分,并提供多个场景的示例。

核心架构与组件

数据流:MySQL → Logstash (
jdbc input

filter

kafka output
) → Kafka

为了实现高效的增量同步,其核心工作机制如下所示:

基础配置文件详解 (
mysql-to-kafka.conf
)


input {
  jdbc {
    # 【必需】JDBC 连接字符串
    jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"

    # 【必需】数据库用户名和密码
    jdbc_user => "your_username"
    jdbc_password => "your_password"

    # 【必需】MySQL JDBC 驱动路径
    # 需要手动下载 https://dev.mysql.com/downloads/connector/j/
    jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 注意类名

    # 【必需】要执行的 SQL 语句
    # 1. 使用增量字段(如update_time, id)进行增量查询
    # 2. :sql_last_value 是Logstash提供的变量,记录上一次执行的值
    statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"

    # 【强烈建议】定时执行,使用cron表达式。例如每分钟一次。
    schedule => "* * * * *"

    # 【强烈建议】记录上次查询结果的字段值(如最大的update_time或id)
    # 此文件由Logstash管理,用于下次查询的:sql_last_value
    record_last_run => true
    last_run_metadata_path => "/path/to/.logstash_jdbc_last_run" 

    # 【可选】是否强制将JDBC列的字符串转换为UTF-8
    jdbc_default_timezone => "UTC"
    jdbc_force_standard_timezone => true

    # 【可选】分页查询,用于处理大表
    jdbc_paging_enabled => true
    jdbc_page_size => 100000
  }
}

filter {
  # 此处是进行数据清洗和转换的地方,根据需求添加。
  # 例如:
  
  # 1. 删除不必要的字段
  mutate {
    remove_field => ["@version", "@timestamp"]
  }

  # 2. 如果需要,可以将记录转换为JSON字符串(如果Kafka希望接收字符串消息)
  # json {
  #   source => "message"
  #   target => "value"
  # }
}

output {
  kafka {
    # 【必需】Kafka集群的broker列表
    bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"

    # 【必需】目标Topic的名称
    topic_id => "mysql-your_table-topic"

    # 【必需】指定消息的序列化格式。通常使用JSON。
    codec => json

    # 【可选】消息的Key。非常重要,用于决定Kafka分区。
    # 使用表的主键可以保证同一主键的记录总是进入同一分区,保证顺序性。
    message_key => "%{id}" # 假设表有一个'id'字段

    # 【可选】压缩算法,提升传输效率
    compression_type => "snappy"

    # 【可选】生产者ACK机制
    acks => "1" # "all" for highest durability

    # 【可选】SSL/SASL认证(如果Kafka集群需要)
    # ssl_truststore_location => "/path/to/client.truststore.jks"
    # ssl_truststore_password => "password"
    # sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';"
    # sasl_mechanism => "PLAIN"
    # security_protocol => "SASL_SSL"
  }

  # 强烈建议添加stdout输出用于调试,生产环境中可注释掉
  stdout {
    codec => rubydebug
  }
}

不同场景的配置示例

场景 1:基于时间戳的增量同步(最常用)

适用于有
update_time

create_time
字段的表,能捕获插入和更新操作。


input {
  jdbc {
    ...
    statement => "SELECT * FROM orders WHERE update_time > :sql_last_value ORDER BY update_time ASC"
    use_column_value => true
    tracking_column => "update_time" # 指定跟踪的列,类型必须是timestamp或number
    tracking_column_type => "timestamp" # 类型是timestamp
    ...
  }
}
场景 2:基于自增ID的增量同步

适用于只追加(append-only)的表,只能捕获新插入的记录。


input {
  jdbc {
    ...
    statement => "SELECT * FROM log_entries WHERE id > :sql_last_value ORDER BY id ASC"
    use_column_value => true
    tracking_column => "id" # 指定跟踪的列
    tracking_column_type => "numeric" # 类型是数字
    ...
  }
}
场景 3:全量同步

通常只在初始化时使用。


input {
  jdbc {
    ...
    statement => "SELECT * FROM products" # 没有WHERE条件
    schedule => "*/5 * * * *" # 每5分钟一次(慎用!会导致大量重复数据)
    # 注意:如果进行全量同步,通常需要设置message_key并使用Kafka的日志压缩功能来去重。
    ...
  }
}

运行与调试

下载 JDBC 驱动:从 MySQL 官网 下载 Platform Independent 版本的驱动包(如
mysql-connector-java-8.0.33.tar.gz
),解压后找到其中的
.jar
文件,将其路径填入
jdbc_driver_library

测试配置文件


/path/to/logstash/bin/logstash -f /path/to/your/mysql-to-kafka.conf --config.test_and_exit

启动 Logstash


/path/to/logstash/bin/logstash -f /path/to/your/mysql-to-kafka.conf

查看输出:启动后,控制台(
stdout
)会打印出获取到的数据,方便你调试。确认数据正确后,可以注释掉
stdout
输出。

重要注意事项与最佳实践

性能与资源

jdbc_page_size
:对于大数据表,务必启用分页 (
jdbc_paging_enabled => true
) 并设置合理的页面大小,避免一次性加载过多数据导致内存溢出(OOM)。调度间隔:根据数据更新频率设置
schedule
。太频繁会增加数据库压力,太慢会导致数据延迟。 数据顺序与可靠性

message_key
务必设置。使用表的主键,这对于 Kafka 主题的日志压缩(Log Compaction)和保证同一键值的消息顺序至关重要。
acks
:对数据可靠性要求高的场景,设置为
"all"
时区问题
建议将数据库、Logstash 和 Kafka 的时区都统一设置为 UTC,避免出现时区转换错误。 高级CDC
上述配置是基于查询的简单CDC,有延迟且可能遗漏高频更新。对于要求实时、精确的CDC,应该使用更高级的工具,如:
Debezium:直接读取数据库的 binlog,是更好的选择。Canal:阿里巴巴的开源项目,同样基于 binlog。

Logstash JDBC -> Kafka 管道是实现数据库到 Kafka 同步的一个强大而灵活的方法,非常适合中等延迟要求的用例。配置的核心在于正确编写增量查询 SQL 并管理好
:sql_last_value


关于sql_last_value设置要点

对于初次使用 Logstash 从 MySQL 同步数据到 Kafka,设置
sql_last_value
是关键的第一步。根据你的目标,有不同的初始值设置方法。

核心决策:你要全量同步还是增量同步?

同步类型 目标 初始值设置方法
全量同步 将表中所有历史数据全部导入 Kafka 不干预,使用默认值
增量同步 只从当前时刻开始同步后续的新数据 手动创建状态文件

方案一:全量同步(导入全部历史数据)

这是最简单的方式,让 Logstash 自己管理即可。

配置文件示例:


input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database"
    jdbc_user => "your_username"
    jdbc_password => "your_password"
    jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    
    # 核心SQL语句:使用 :sql_last_value 变量
    statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"
    
    # 定时调度(例如每分钟一次)
    schedule => "* * * * *"
    
    # 启用增量追踪
    use_column_value => true
    tracking_column => "update_time" # 指定追踪的字段
    tracking_column_type => "timestamp" # 字段类型
    
    # 状态文件路径(重要!Logstash用它记录最后一次的值)
    last_run_metadata_path => "/path/to/.logstash_jdbc_last_run_your_table"
  }
}

output {
  kafka {
    bootstrap_servers => "localhost:9092"
    topic_id => "your_topic_name"
    codec => json
  }
}

首次运行会发生什么:

Logstash 启动。

检查
/path/to/.logstash_jdbc_last_run_your_table
文件,发现文件不存在

Logstash 自动将
:sql_last_value
设置为默认初始值

对于数字类型(如自增ID),默认是
0
。对于时间类型(如
update_time
),默认是 Unix 纪元时间(1970-01-01 00:00:00 UTC)

执行的 SQL 语句变为:


SELECT * FROM your_table WHERE update_time > '1970-01-01 00:00:00'

这条语句会匹配表中的所有记录,从而实现全量同步。

同步完成后,Logstash 会将本次查询到的最大
update_time
值写入状态文件,后续的执行就基于这个值进行增量同步。

方案二:增量同步(仅同步未来数据)

如果你不想同步巨大量的历史数据,只想从现在开始同步,你需要手动设置初始值

操作步骤:

获取当前时间。在 MySQL 中执行:


SELECT NOW();

假设返回
2023-10-27 10:00:00

手动创建状态文件


# 创建状态文件,并写入当前时间作为初始值
echo "2023-10-27 10:00:00" > /path/to/.logstash_jdbc_last_run_your_table

注意:时间格式必须与数据库中的格式一致。

然后启动 Logstash

首次运行会发生什么:

Logstash 启动。

检查状态文件,发现文件存在且内容为
2023-10-27 10:00:00

Logstash 将
:sql_last_value
设置为你提供的这个时间。

执行的 SQL 语句变为:


SELECT * FROM your_table WHERE update_time > '2023-10-27 10:00:00'

这条语句会匹配所有在这个时间点之后新增或修改的记录。之前的历史数据不会被同步。

基于自增ID的增量同步配置

如果表没有时间戳,但有自增ID(AUTO_INCREMENT ID),也可以用它来追踪。


input {
  jdbc {
    # ... 其他连接参数同上

    statement => "SELECT * FROM your_table WHERE id > :sql_last_value"

    schedule => "* * * * *"
    use_column_value => true
    tracking_column => "id" # 追踪字段改为id
    tracking_column_type => "numeric" # 类型改为数字
    last_run_metadata_path => "/path/to/.logstash_jdbc_last_run_your_table_id"
  }
}

全量同步:不手动创建状态文件,Logstash 默认从
0
开始,查询所有
id > 0
的记录。增量同步:手动创建状态文件,并写入一个初始ID(如
1000
),则后续只同步
id > 1000
的记录。

重要注意事项和最佳实践

文件权限:确保运行 Logstash 的用户有权限读写
last_run_metadata_path
指定的文件。

不要重复使用状态文件每个独立的 JDBC input 配置必须使用不同的
last_run_metadata_path
,否则它们会互相覆盖,导致同步混乱。

测试:首次运行前,先用
--config.test_and_exit
测试配置文件语法:


/path/to/logstash/bin/logstash -f your_config.conf --config.test_and_exit

调试:首次运行时,保留
stdout { codec => rubydebug }
输出,在控制台查看获取到的数据,确认SQL查询和值是否正确。

时区问题:建议将数据库、Logstash 和状态文件中的时间都统一为 UTC 时区,避免时区转换带来的复杂性和错误。

总结:对于初次使用,如果需要全量数据,什么都不用做,直接运行 Logstash 即可。如果你只需要从今往后的新数据,那么就在运行前,手动创建状态文件并写入当前时间。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容