Canal

一、简介

项目背景:早期,阿里巴巴 B2B 公司因为存在杭州和美国双机房部署,存在跨机房同步的业务 需求,从 2010 年开始,阿里开始逐步尝试基于数据库的日志解析,获取增量变更进行同步, 由此衍生出了 Canal 项目。

主要用途:用于 MySQL 数据库增量日志数据的解析、订阅和消费 ,采用 Java 语言开发并开源。

Githubhttps://github.com/alibaba/canal

二、Canal 工作流程

1. MySQL 主从复制工作流程

图片[1]-Canal-深吸氧

MySQL 主从复制步骤:

  1. 当 master 主服务器上的数据发生改变时,则将其改变写入二进制日志事件文件(binary log events,可以通过 show binlog events 进行查看) 中
  2. salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志事件进行探测,探 测其是否发生过改变,如果探测到 master 主服务器的二进制日志事件发生了改变,则开始一 个 I/O Thread 请求 master 二进制事件日志
  3. 同时 master 主服务器为每个 I/O Thread 启动一个 dump Thread,用于向其发送二进制日志事件
  4. slave 从服务器将接收到的二进制日志事件保存至自己本地的中继日志文件中(relay log)
  5. salve 从服务器将启动 SQL Thread 从中继日志(relay log)中读取二进制日志,在本地重 放,使得其数据和主服务器保持一致
  6. 最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒

2. Canal 工作流程

图片[2]-Canal-深吸氧
  1. Canal 模拟 MySQL Slave 的交互协议,把自己伪装为 MySQL Slave,向 MySQL Master 发 送 Dump 协议
  2. MySQL Master 收到 Dump 请求,开始推送 binary log 给 Slave(即 Canal )
  3. Canal 解析 binary log 对象 (原始数据为 byte 流)

三、Canal 使用场景

Canal 定位是基于 MySQL 数据库增量日志解析,提供增量数据订阅&消费 , 目前主要支持了 MySQL (也支持 Maria DB) ,版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x ,可以使用在如下一些应用场景:

  • 数据库镜像
  • 数据库实时备份
  • 业务 cache 刷新
  • 价格变化等重要业务消息
  • 带业务逻辑的增量数据处理
  • 跨数据库的数据备份(异构数据同步),例如 mysql => oracle,mysql=>mongo,mysql =>redis,mysql => elasticsearch, mysql => Kafka/RocketMQ/RabbitMQ 等
图片[3]-Canal-深吸氧

四、Canal 运行架构

Canal 是一种 client-server 模式 ,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

Protobuf 即 Google Protocol Buffer,是 Google 团队开发的用于高效存储和读取结构化数据的工 具,xml 、json 也可以用来存储此类结构化数据,但是使用 protobuf 表示的数据能更加高效, 并且将数据压缩得更小,大约是json 格式的 1/ 10 ,xml 格式的 1/20。

图片[4]-Canal-深吸氧

另外 Canal 可以直接将数据库变更记录投递到 MQ 系统中 ,比如 Kafka/RocketMQ

五、Canal 环境部署

1. MySQL配置

1. 开启 MySQL 的 binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 配置如下:

[mysqld] 
server_id=3306 #配置 MySQL replaction 需要定义 要跟
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式 
binlog-do-db=bmp-pms#指定记录哪个 db 的 binlog 日志,如果不配置,则表示开启所有库的 binlog

2. Canal 的 binlog 日志格式建议为 row 格式,Binlog 的三种基本类型分别为:

  1. STATEMENT 模式,只记录了 SQL 语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况
  2. ROW 模式,除了记录 SQL 语句之外,还会记录每个字段的变化情况,能够清楚的记录 每行数据的变化历史,但是会占用较多的空间,需要使用 mysqlbinlog 工具进行查看
  3. MIX 模式,比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为 statement 模式,当遇到了数据更新或者删除情况下就会变为 row 模式

目前 canal 支持所有模式的增量订阅(但配合同步时,因为 statement 只有 SQL,没有数据, 无法获取原始的变更日志,所以一般建议为 ROW 模式)

3. Binlog 相关命令:

#是否启用了日志 
show variables like 'log_bin'; 
#怎样知道当前的日志 
show master status; 
#查看 mysql binlog 模式 
show variables like 'binlog_format'; 
#获取 binlog 文件列表 
show binary logs; 
#查看指定 binlog 文件的内容 
show binlog events in 'mysql-bin.000001'; 

4. 授权 canal 连接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接授权:

CREATE USER canal IDENTIFIED BY 'canal'; 
alter user 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 
FLUSH PRIVILEGES;

2. Canal 部署配置

1. 下载 canal 部署程序

wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz 
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal.deployer-1.1.6 

2.配置修改

vim conf/example/instance.properties 
主要是修改配置文件中与自己的数据库配置相关的信息; 
canal.instance.master.address=127.0.0.1:3306 
canal.instance.dbUsername=canal 
canal.instance.dbPassword=canal

3.启动 Canal

./startup.sh

4.操作命令

//查看进程
ps -ef | grep canal 

//查看 server 日志 
cat logs/canal/canal.log 

//查看 instance 的日志 
vi logs/example/example.log 

//关闭 Canal 
./stop.sh 

Canal Server 的默认端口号为:11111,如果需要调整的话,可以到/conf 目录下的canal.properties 文件中进行修改

六、Canal Client 应用开发

1.开发步骤

对于 Java 开发来说,Canal 给我们提供了一个客户端 jar 包,我们使用该 jar 包中的 api 开发, 实现从 Canal Server 中接收数据。Maven 依赖:

<dependency> 
  <groupId>com.alibaba.otter</groupId> 
  <artifactId>canal.client</artifactId> 
  <version>1.1.6</version> 
</dependency> 
<dependency> 
  <groupId>com.alibaba.otter</groupId> 
  <artifactId>canal.protocol</artifactId> 
  <version>1.1.6</version> 
</dependency> 

Canal Client 程序开发 5 步:

  1. 创建 Canal 连接
  2. 建立 Canal 连接
  3. 订阅数据库
  4. 获取数据
  5. 解析数据

2. Canal Server认证访问

1.修改 canal.properties 配置文件

canal.destinations = bmp-pms
canal.user = canal 
canal.passwd = bd577d6db462dff84fa6de3f67d807daa7fe9c8c

加密通过 SecurityUtil.scrambleGenPass(…)进行加密

2.把 example 实例的文件夹改一下名字,改成 bmp-pms

3.把 conf 目录下除了 instance.properties 文件之外,其他文件都删除一下,否则会报错, 因为之前的实例名字不一样导致的,有历史数据

3. Canal 的数据解析

图片[5]-Canal-深吸氧

4. Canal 数据同步案例

比如一个系统的某一个业务功能需要依赖另一个系统的数据,当另一个系统数据发生变化时, 需要把变化的数据同步过来。

当然有很多方案可以实现,Canal 的好处是对于业务代码没有侵入,不需要在业务代码中加入 数据同步的逻辑,因为是基于 mysql binlog 日志去实现同步数据,同时也能够做到准实时同步,所以是很多企业一种比较常见的数据同步的方案。

图片[6]-Canal-深吸氧

七、Canal Server MQ 模式

Canal Server 支持的服务模式有: tcp(默认),kafka,RocketMQ,rabbitmq,pulsarmq;Rabbitmq是Canal 1.1.5开始支持的。

图片[7]-Canal-深吸氧

Canal 1.1.1 版本之后, 支持将 canal server 接收到的 binlog 数据直接投递到 MQ, 目前默认支持的 MQ 系统有Kafka,RocketMQ ,RabbitMQ,PulsarMQ。

图片[8]-Canal-深吸氧

通过安装目录下 conf/canal.properties 文件进行配置

//tcp(默认), kafka, RocketMQ, rabbitMQ, pulsarMQ 
canal.serverMode = rabbitMQ

1. 模式 RabbitMQ

图片[9]-Canal-深吸氧

1.修改 instance 配置文件 vim conf/bmp-pms/instance.properties

//从哪个 mysql 数据库获取 binlog 数据 
canal.instance.master.address=172.19.225.99:3306
canal.instance.dbUsername = canal 
canal.instance.dbPassword = canal 

//把数据投递到哪个 MQ 里面去 
# mq config 这里配置 rabbitmq的routingkey 
canal.mq.topic=canal_key

2.修改 Canal server 配置文件 vim conf/canal.properties

//服务模式修改: 
canal.serverMode = rabbitMQ 
//连接哪个 MQ 服务器,ip 和端口: 
rabbitmq.host = 47.98.204.243:5672
rabbitmq.virtual.host = / 
rabbitmq.exchange = canal_exchange 
rabbitmq.username = admin
rabbitmq.password = ..123456
rabbitmq.deliveryMode = 

3.通过 Java 代码,把 exchange、queue、routingkey 绑定一下

4.启动好各个服务:mysql、canal server、rabbitmq,然后测试。
注意这里canal server启动之前要保证canal_exchange,canal_key,canal_queue在RabbitMQ中是存在的不然会报错

© 版权声明
THE END
请撒泡尿证明你到此一游
点赞1 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容