Appearance
Canal 实现缓存同步完整教程
1. Canal 介绍
1.1. 什么是 Canal
是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析的数据同步工具,主要用途是提供增量数据订阅和消费。
1.2. 工作原理
Canal 的核心原理是模拟 MySQL Slave 的交互协议:
- 伪装成 MySQL Slave:Canal 将自己伪装成 MySQL 的一个 Slave 节点;
- 订阅 Binlog:向 MySQL Master 发送 dump 协议,请求推送 Binlog;
- 解析 Binlog:MySQL Master 收到请求后,开始推送 Binary Log 给 Canal;
- 数据分发:Canal 解析 Binlog 对象(原始为字节流),转换为结构化数据并提供给客户端。
1.3. 典型应用场景
- 缓存同步:数据库变更后自动更新 Redis 缓存;
- 数据异构:将 MySQL 数据同步到 ES、HBase 等;
- 业务解耦:通过订阅数据变更实现系统解耦。
2. 修改 MySQL 的配置
2.1. 检查 Binlog 状态
Canal 依赖 MySQL 的 Binlog,需要确保 MySQL 开启了 Binlog 且格式为 ROW 模式。
SQL
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';2.2. 修改 MySQL 配置
如果 Binlog 未开启或格式不正确,需要修改配置:
INI
[mysqld]
# 设置 server-id(Canal 需要)
server-id=1
# 开启 binlog
log-bin=mysql-bin
# 设置 binlog 格式为 ROW
binlog-format=ROW
# binlog 过期时间(可选,默认 0 表示不自动删除)
expire_logs_days=72.3. 创建 Canal 用户并授权
SQL
-- 创建 Canal 用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-- 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;2.4. 验证配置
SQL
-- 检查 binlog 状态
SHOW MASTER STATUS;
-- 查看 binlog 列表
SHOW BINARY LOGS;
-- 验证用户权限
SHOW GRANTS FOR 'canal'@'%';3. Canal Docker 部署
3.1. Docker 运行命令
Bash
docker run -d \
-h <HOST_NAME> \
--name canal-server \
-e canal.instance.master.address=127.0.0.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.gtidon=true \
-e canal.instance.filter.regex=.*\\..* \
-p 11110:11110 \
-p 11111:11111 \
-p 11112:11112 \
-p 9100:9100 \
-v canal-logs:/home/admin/canal-server/logs \
-m 4096m \
canal/canal-server信息
-e 参数可以指定 canal.properties / instance.properties 里所有的配置。
3.2. Canal 配置说明
instance.properties 的部分配置说明如下:
| 参数名字 | 参数说明 | 默认值 |
|---|---|---|
canal.instance.mysql.slaveId | MySQL 集群配置中的 serverId 概念,需要保证和当前 MySQL 集群中 Id 唯一(v1.1.x 版本之后 Canal 会自动生成,不需要手工指定) | 无 |
canal.instance.master.address | MySQL 主库链接地址 | 127.0.0.1:3306 |
canal.instance.master.journal.name | MySQL 主库链接时起始的 binlog 文件 | 无 |
canal.instance.master.position | MySQL 主库链接时起始的 binlog 偏移量 | 无 |
canal.instance.master.timestamp | MySQL 主库链接时起始的 binlog 的时间戳 | 无 |
canal.instance.gtidon | 是否启用 MySQL GTID 的订阅模式 | false |
canal.instance.master.gtid | MySQL 主库链接时对应的 GTID 位点 | 无 |
canal.instance.dbUsername | MySQL 数据库帐号 | canal |
canal.instance.dbPassword | MySQL 数据库密码 | canal |
canal.instance.defaultDatabaseName | MySQL 链接时默认 Schema | |
canal.instance.connectionCharset | MySQL 数据解析编码 | UTF-8 |
canal.instance.filter.regex | MySQL 数据解析关注的表,Perl 正则表达式。多个正则之间以逗号 , 分隔,转义符需要双斜杠 \\。常见例子:1. 所有表: .* or .*\\..*2. canal Schema 下所有表:canal\\..*3. canal 下的以 canal 打头的表:canal\\.canal.*4. canal Schema 下的一张表:canal\\.test15. 多个规则组合使用: canal\\..*,mysql.test1,mysql.test2(逗号分隔) | .*\\..* |
canal.instance.filter.black.regex | MySQL 数据解析表的黑名单,表达式规则见白名单的规则 | 无 |
canal.instance.rds.instanceId | Aliyun RDS 对应的实例 Id 信息(如果不需要在本地 binlog 超过 18 小时被清理后自动下载 OSS 上的 binlog,可以忽略该值) | 无 |
特殊说明:
MySQL 链接时的起始位置
canal.instance.master.journal.name+canal.instance.master.position:精确指定一个 binlog 位点,进行启动;canal.instance.master.timestamp:指定一个时间戳,Canal 会自动遍历 MySQL binlog,找到对应时间戳的 binlog 位点后,进行启动;不指定任何信息:默认从当前数据库的位点(
show master status,进行启动)。
MySQL 解析关注表定义
标准的 Perl 正则,注意转义时需要双斜杠:
\\。MySQL 链接的编码
目前 Canal 版本仅支持一个数据库只有一种编码,如果一个库存在多个编码,需要通过
filter.regex配置,将其拆分为多个 Canal Instance,为每个 Instance 指定不同的编码。
信息
可以通过 canal.instance.standby.xxx 指定备用 MySQL 配置。更多关于 Canal 的配置详情参考:https://github.com/alibaba/canal/wiki/AdminGuide。
4. Canal Java 客户端使用教程
4.1. Maven 依赖
XML
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>4.2. 简单客户端示例
Java
public class SimpleCanalClientExample {
public static void main(String args[]) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId);
// connector.rollback(batchId);
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + ": " + column.getValue() + " update = " + column.getUpdated());
}
}
}