1、介绍说明
名称:canal [kə'næl]
译意: 水道/管道/沟渠
语言: 纯java开发
定位: 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
2、工作原理
- 2.1、MySQL主备复制实现

从上图来看,复制分成三步:
Master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
Slave将Master的binary log events拷贝到它的中继日志(relay log);
Slave重做中继日志中的事件,将改变反映它自己的数据。
- 2.2、Canal的工作原理

图1
原理相对比较简单:
Canal模拟MySQL Slave的交互协议,伪装自己为MySQL Slave,向MySQL Master发送dump协议
MySQL Master收到dump请求,开始推送binary log给Slave(也就是Canal)
Canal解析binary log对象(原始为byte流)
3、服务器列表
|
主机IP |
主机名称 |
内存需求 |
作用说明 |
|
xxx.xxx.xxx.xxx |
canal01 |
8G |
Master |
|
xxx.xxx.xxx.xxx |
canal02 |
8G |
Standby |
|
xxx.xxx.xxx.xxx |
canal03 |
8G |
Standby |
4. 环境要求
- OS: Linux only, CentOS 6.5+
- JDK: 1.8+ ,参见:Java环境快速搭建
- Zokeerper: 3.4.5
- Kafka: 2.11-1.0.1
5. 准备工作
Canal-1.1.3下载地址:
https://github.com/alibaba/canal/releases/tag/canal-1.1.3
Canal GitHub:https://github.com/alibaba/canal
a) Canal的原理是基于MySQL binlog技术,所以这里必定需要开启Mysql的binlog写入功能,提议配置binlog模式为row.
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置MySQL replaction需要定义,不能和Canal的SlaveId重复
b) Canal的原理是模拟自己为MySQL Slave,所以这里必定需要做为MySQL Slave的相关权限.
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
6、HA配置
6.1、文件解压
mkdir canal-1.1.3
tar -xzf canal.deployer-1.1.3-SNAPSHOT.tar.gz -C canal-1.1.3
6.2、canal.properties配置
vim $CANAL_HOME/conf/canal.properties
# zk集群配置
canal.zkServers = zk1:2181,zk2:2181,zk3:2181
# 数据模式:tcp, kafka, RocketMQ
canal.serverMode = kafka
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = true
canal.instance.filter.table.error = true
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
#监控实例,根据产品划分,多个以逗号分割
canal.destinations = test
# Canal HA 配置
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# Kafka集群配置
canal.mq.servers = kafka1:9092,kafka2:9092,kafka3:9092
6.3、instance.properties配置
vim $CANAL_HOME/conf/产品ID/instance.properties
# 全局唯一标识不能重复即可
canal.instance.mysql.slaveId=11
# MysQL的Binlog服务器地址
canal.instance.master.address=[host|ip]:port
canal.instance.dbUsername=用户名
canal.instance.dbPassword=密码
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = 默认库名
# Perl正则表达式过滤
canal.instance.filter.regex=库名1..*,库名2..*
# 数据装载指定topic
canal.mq.topic=test
7、启动Canal
# 启动两台服务器上的Canal
cd canal-1.1.3
./bin/startup.sh
# 启动后,可以查看logs/产品ID/产品ID.log,只会看到一台机器上出现了启动成功的日志。
8、停止Canal
# 停止两台服务器上的Canal
cd canal-1.1.3
./bin/startup.sh
9、HA测试
zookeeper-client
# 查看Canal HA注册主机节点
[zk: localhost:2181(CONNECTED) 15] ls /otter/canal/destinations/产品ID/cluster
# 查看正在运行中的Canal节点
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/产品ID/running
# 测试Kafka是否正常显示消费Topic数据
kafka-console-consumer.sh --zookeeper zkHost:2181 --topic 产品ID --from-beginning
10、参考资料
Canal Wiki:https://github.com/alibaba/canal/wiki
总结:对于MySQL实时biglog采集Canal还是不错的选择,目前公司binlog采用则使用Canal进行数据收集。
















暂无评论内容