1. 引言与Canal基础概述
本文详细介绍Canal在Spring Boot环境下与MySQL数据同步的完整集成流程,包括环境准备、Canal部署配置以及Spring Boot客户端开发。
1.1 Canal简介与核心原理
Canal是阿里巴巴开源的一款基于MySQL数据库增量日志解析的高性能数据同步系统,其英文含义为"管道"或"沟渠",形象地表达了其在数据传输中的作用。
核心工作原理
- Canal模拟MySQL slave的身份向master发送dump请求
- MySQL master收到请求后开始推送binary log给Canal
- Canal解析接收到的二进制日志对象,转换为结构化的数据变更事件
支持的MySQL版本
典型应用场景
1.2 Canal架构组件与工作机制
Canal的整体架构采用Server-Instance模式设计,主要由Canal Server和Canal Instance两个核心组件构成。
Canal Server
代表一个Canal运行实例,对应一个JVM进程。负责管理和运行整个Canal服务,处理数据源的接入、数据解析、过滤和存储等任务。支持集群化部署,通过Zookeeper管理集群状态。
Canal Instance
对应一个数据队列,负责从特定数据源读取变更数据并推送到下游消费者。每个Instance独立运行,管理特定的数据库连接和数据流。
Canal Instance核心组件
组件名称 | 主要功能 |
---|---|
EventParser | 数据源接入,模拟主从协议交互,解析变更数据 |
EventSink | 数据过滤、加工和分发,连接Parser和Store |
EventStore | 暂存"尚未消费"的events,默认基于内存的阻塞队列 |
MetaManager | 管理增量订阅和消费信息,维护Instance状态 |
1.3 技术选型考量与优势分析
数据同步工具对比
Canal核心优势
实时性高
基于binlog的增量同步,毫秒级同步,延迟控制在1秒内,不侵入业务代码
性能卓越
1.1.x版本性能提升150%,原生支持Prometheus监控、Kafka消息投递
架构灵活
支持单机、集群、分布式多种部署模式,支持动态扩展与故障自动转移
生态完善
支持Java、C#、Go、PHP、Python等多种编程语言客户端实现
使用便捷
与Spring Boot集成简单,引入依赖即可快速实现数据同步功能
2. 部署环境准备与MySQL配置
本节介绍Canal部署所需的系统环境要求、MySQL配置调整以及网络安全设置。
2.1 系统环境要求与软件依赖
基础环境要求
- 操作系统:Windows、Linux、MacOS(推荐Linux)
- Java环境:JDK 1.8或以上版本
- MySQL版本:5.1.x、5.5.x、5.6.x、5.7.x、8.0.x
推荐硬件配置
- CPU:至少4核,推荐8核或以上
- 内存:至少8GB,推荐16GB或以上
- 磁盘:至少100GB可用空间,推荐SSD
网络环境要求
- Canal服务器与MySQL服务器网络连接稳定,延迟建议小于1ms
- 带宽满足数据传输需求,根据实际数据量调整
- 建议启用NTP时间同步,确保服务器时间一致性
2.2 MySQL配置调整与优化
2.2.1 开启binlog日志功能
编辑MySQL配置文件(my.cnf或my.ini),在[mysqld]区块下添加以下配置:
[mysqld]
log-bin=mysql-bin # 开启binlog功能
binlog-format=ROW # 设置binlog格式为ROW模式
server_id=1 # 配置MySQL replication需要的server ID,不要与Canal的slaveId重复
重启MySQL服务后,使用以下命令检查binlog是否已正确开启:
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'server_id';
2.2.2 设置binlog相关参数
添加以下配置以优化性能和安全性:
[mysqld]
expire_logs_days=7 # 设置binlog过期时间为7天
binlog_row_image=FULL # 记录完整的行数据
binlog-ignore-db=mysql # 忽略mysql系统库的binlog记录
lower_case_table_names=1 # 表名存储为小写,比较时不区分大小写
default_authentication_plugin=mysql_native_password # 使用传统的密码认证方式
2.2.3 创建Canal专用用户并授权
为Canal创建专用的数据库用户,并授予必要的权限:
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal123'; # 创建canal用户,密码为canal123
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; # 授予必要的权限
FLUSH PRIVILEGES; # 刷新权限
注意事项
如果MySQL启用了密码策略,密码设置不能过于简单,否则会出现密码策略错误。此时需要设置更复杂的密码或调整MySQL的密码策略配置。
2.3 网络环境与安全配置
端口开放要求
服务/功能 | 默认端口 | 用途说明 |
---|---|---|
MySQL数据库 | 3306 | Canal服务器访问MySQL的端口 |
Canal Server | 11111 | Canal Server提供socket服务的端口 |
Canal Admin | 11110 | Canal管理控制台端口(如果启用) |
Canal metrics | 11112 | Prometheus监控指标拉取端口 |
Linux防火墙配置示例
# 使用firewalld开放端口
firewall-cmd --zone=public --add-port=11111/tcp --permanent
firewall-cmd --zone=public --add-port=11110/tcp --permanent
firewall-cmd --zone=public --add-port=11112/tcp --permanent
firewall-cmd --reload
# 查看已开放的端口
firewall-cmd --list-ports
安全加固建议
- 将Canal部署在独立的服务器上,与数据库服务器分离
- 使用专用的数据库用户,避免使用root等超级用户
- 限制Canal服务器的网络访问权限,只允许必要的连接
- 定期审查数据库访问日志,监控异常访问行为
- 启用NTP时间同步,确保服务器时间一致性
3. Canal Server部署与配置
本节详细介绍Canal Server的下载安装、配置文件详解、多instance配置以及集群部署方案。
3.1 下载安装与目录结构
3.1.1 版本选择与下载
Canal的最新稳定版本是1.1.8(发布于2025年1月16日),推荐从官方GitHub发布页面下载:
使用wget命令下载最新版本(以Linux为例):
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz
3.1.2 解压安装与目录说明
Linux系统安装步骤
# 创建安装目录
mkdir /opt/canal
cd /opt/canal
# 解压安装包
tar zxvf /path/to/canal.deployer-1.1.8.tar.gz
# 进入Canal目录
cd canal.deployer-1.1.8
Windows系统安装步骤
- 创建安装目录(例如D:\canal)
- 解压下载的ZIP文件到该目录
- 通过命令提示符或PowerShell进入安装目录
Canal目录结构
canal.deployer-1.1.8/
├── bin/ # 启动脚本目录
├── conf/ # 配置文件目录
│ ├── canal.properties # 全局配置文件
│ ├── example/ # 示例instance配置目录
│ │ └── instance.properties
│ └── logback.xml # 日志配置文件
├── lib/ # 依赖库目录
├── logs/ # 日志文件目录
└── plugin/ # 插件目录
3.2 核心配置文件详解
3.2.1 canal.properties全局配置
canal.properties是Canal的全局配置文件,包含了Canal Server的基本配置信息。以下是主要配置参数的说明:
# Canal基本配置
canal.id=1 # 每个Canal Server实例的唯一标识
canal.ip= # Canal Server绑定的本地IP地址,如果不配置,默认选择一个本机IP
canal.port=11111 # Canal Server提供socket服务的端口
canal.metrics.pull.port=11112 # 监控指标拉取端口,用于Prometheus监控
canal.zkServers= # Canal Server连接Zookeeper集群的地址(集群模式需要)
# Canal运行模式配置
canal.serverMode=tcp # 可选值:tcp, kafka, RocketMQ
canal.withoutNetty=false # 是否禁用Netty,一般不需要修改
# Canal Admin配置(如果启用Admin功能)
canal.admin.port=11110 # Canal Admin端口
canal.admin.user=admin # 管理员用户名
canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 # 管理员密码(SHA-1加密)
# 实例配置
canal.destinations=example # 指定实例名,多个实例用逗号分隔
canal.instance.global.spring.xml=classpath:spring/default-instance.xml
# 内存配置
canal.instance.memory.buffer.size=16384 # 内存缓冲区大小,必须是2的幂
canal.instance.memory.buffer.memunit=1024 # 内存单元大小,默认1KB
canal.instance.memory.batch.mode=MEMSIZE # 内存批次模式:MEMSIZE或ITEMSIZE
重要提示
canal.destinations参数指定了要启动的instance名称,多个instance用逗号分隔。例如,如果要监控两个数据库,可以配置为canal.destinations=db1,db2,然后在conf目录下创建db1和db2两个目录,并分别配置各自的instance.properties文件。
3.2.2 instance.properties实例配置
instance.properties是每个instance的具体配置文件,包含了连接MySQL数据库和数据解析的相关配置:
# 基本配置
canal.instance.mysql.slaveId=1234 # 必须配置,不能与MySQL的server_id重复
canal.instance.master.address=127.0.0.1:3306 # MySQL主库地址和端口
canal.instance.dbUsername=canal # 数据库用户名
canal.instance.dbPassword=canal # 数据库密码
canal.instance.defaultDatabaseName= # 默认数据库名
canal.instance.connectionCharset=UTF-8 # 连接字符集
# 过滤配置
canal.instance.filter.regex=.*\\..* # 表过滤正则表达式
canal.instance.filter.black.regex= # 黑名单过滤正则表达式
# 解析配置
canal.instance.parser.parallel=false # 是否开启并行解析
canal.instance.parser.parallelThreadSize=16 # 并行解析线程数(仅当parallel=true时有效)
canal.instance.parser.direct=false # 是否直接解析(不建议修改)
表过滤规则说明
过滤规则 | 说明 |
---|---|
.*\\..* | 匹配所有库的所有表 |
test\\..* | 匹配test库的所有表 |
test\\.user | 匹配test库的user表 |
test\\..*,demo\\..* | 匹配test库和demo库的所有表 |
test\\.user,test\\.order | 匹配test库的user表和order表 |
注意事项
如果在配置文件中配置了过滤规则,就不应该在代码中调用CanalConnector.subscribe()方法,否则配置文件中的过滤配置会被覆盖。
3.3 多instance配置与集群部署
3.3.1 配置多个数据源
如果需要监控多个MySQL数据库或同一个数据库的不同部分,可以通过配置多个instance来实现:
-
修改全局配置文件
编辑canal.properties,在canal.destinations参数中指定多个instance名称,用逗号分隔:
canal.destinations=db1,db2,db3
-
创建instance目录
在conf目录下创建与instance名称对应的目录,例如db1、db2、db3。
-
配置instance文件
在每个instance目录下创建instance.properties文件,并配置相应的数据库连接信息和过滤规则。
db1实例配置(监控test数据库的user表):
canal.instance.mysql.slaveId=1001
canal.instance.master.address=192.168.1.100:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=test
canal.instance.filter.regex=test\\.user
db2实例配置(监控demo数据库的所有表):
canal.instance.mysql.slaveId=1002
canal.instance.master.address=192.168.1.101:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=demo
canal.instance.filter.regex=demo\\..*
3.3.2 基于Zookeeper的高可用集群
Canal支持基于Zookeeper的高可用集群部署,通过Zookeeper实现主备切换和负载均衡。
集群部署架构
集群部署关键步骤
-
部署Zookeeper集群
- 部署至少3个Zookeeper节点
- 确保Zookeeper版本在3.4.6以上
- 配置Zookeeper集群间的通信
-
配置Canal Server
在每个Canal Server的canal.properties中添加Zookeeper配置:
canal.zkServers=zk1:2181,zk2:2181,zk3:2181 canal.serverMode=kafka # 建议使用消息队列模式
-
配置instance
在instance配置文件中添加集群相关配置:
canal.instance.global.spring.xml=classpath:spring/zk-instance.xml
-
启动集群
- 依次启动所有Canal Server节点
- 通过Zookeeper选举机制,只有一个节点会成为Active状态
- 其他节点处于Standby状态,等待Active节点故障时接管
3.3.3 负载均衡与故障转移
负载均衡策略
轮询策略
canal.instance.loadbalance.strategy=round_robin
均匀分配instance到各个节点,适合数据量分布均匀的场景
权重策略
canal.instance.loadbalance.strategy=weighted
canal.instance.loadbalance.weight=10
根据权重分配instance,可根据节点性能设置不同权重
一致性哈希策略
canal.instance.loadbalance.strategy=consistent_hashing
根据表名哈希分配到固定节点,适合相同表数据在同一节点处理
故障转移机制
- 当Active节点故障时,Zookeeper会检测到节点消失
- Zookeeper触发选举机制,从Standby节点中选出新的Active节点
- 新的Active节点接管故障节点的所有instance
- 客户端会自动重连到新的Active节点
- 整个故障转移过程通常在几秒内完成
3.4 启动运行与状态检查
3.4.1 启动命令与参数
Linux系统命令
# 启动Canal服务
sh bin/startup.sh
# 停止Canal服务
sh bin/stop.sh
# 重启Canal服务
sh bin/restart.sh
# 查看服务状态
sh bin/status.sh
Windows系统命令
# 启动Canal服务
bin\startup.bat
# 停止Canal服务
bin\stop.bat
# 查看服务状态
bin\status.bat
3.4.2 日志文件与监控
Canal的日志文件存放在logs目录下,主要包括:
日志文件 | 用途说明 |
---|---|
canal.log | Canal Server的主要日志文件 |
example.log | example实例的日志文件(根据instance名称命名) |
canal_gc.log | JVM垃圾回收日志 |
canal_stderr.log | 标准错误输出日志 |
查看实时日志:
tail -f logs/canal.log
tail -f logs/example.log
内存配置调整
如果启动过程中出现内存不足的错误,可以通过修改bin/startup.sh文件中的JVM参数来调整内存配置:
JAVA_OPTS="-server -Xms512m -Xmx512m -Xmn256m -XX:SurvivorRatio=2"
3.4.3 常见启动问题处理
端口占用问题
如果启动时报错端口已被占用,可以通过以下命令查看占用端口的进程:
lsof -i :11111
然后杀死对应的进程或修改Canal的端口配置。
连接数据库失败
- 检查MySQL服务是否正常运行
- 检查instance.properties中的数据库连接信息是否正确
- 检查网络连接是否正常
- 检查MySQL用户权限是否正确
binlog解析错误
- 确保MySQL的binlog格式为ROW模式
- 检查MySQL的server_id与Canal的slaveId是否重复
- 检查binlog是否已开启
4. Spring Boot集成开发指南
本节详细介绍如何在Spring Boot项目中集成Canal客户端,实现MySQL数据同步功能。
4.1 项目依赖配置
4.1.1 Maven依赖配置
Spring Boot 2.x版本依赖
<dependencies>
<!-- Spring Boot Web Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Canal客户端依赖 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
<exclusions>
<!-- 排除冲突的日志依赖 -->
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Canal协议依赖(可选) -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.7</version>
</dependency>
</dependencies>
Spring Boot 3.x版本依赖
<dependencies>
<!-- Spring Boot Web Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Canal客户端依赖 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
<exclusions>
<!-- 排除冲突的日志依赖 -->
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Spring Boot 3.x额外依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
</dependencies>
依赖冲突注意事项
Canal客户端的日志依赖可能与Spring Boot的默认日志框架产生冲突,因此需要通过<exclusions>标签排除冲突的日志依赖,避免启动时出现日志相关的异常。
4.1.2 版本兼容性说明
组件 | 版本要求 | 备注 |
---|---|---|
Spring Boot 2.x | Canal 1.1.7+ | 推荐2.3.x以上版本 |
Spring Boot 3.x | Canal 1.1.7+ | 需要额外添加log4j2依赖 |
Canal | 最低1.1.4,推荐1.1.7+ | 1.1.4+支持Admin动态运维 |
JDK | 2.x需JDK 8+,3.x需JDK 17+ | 遵循Spring Boot官方JDK要求 |
4.2 Canal客户端配置与连接
4.2.1 配置文件设置
在Spring Boot项目中,推荐使用application.yml文件配置Canal连接信息:
canal:
# Canal Server连接配置
server:
host: 127.0.0.1 # Canal Server主机地址
port: 11111 # Canal Server端口
destination: example # instance名称
# 认证信息(如果启用了认证)
username: # 用户名(默认为空)
password: # 密码(默认为空)
# 连接池配置
pool:
max-active: 10 # 最大活跃连接数
max-idle: 5 # 最大空闲连接数
min-idle: 1 # 最小空闲连接数
timeout: 30000 # 连接超时时间(毫秒)
# 订阅配置
subscribe: ".*\\..*" # 订阅所有表的变更事件
filter-regex: # 表过滤正则表达式(可选)
black-regex: # 黑名单正则表达式(可选)
集群模式配置(多个Canal Server):
canal:
server:
cluster:
- host: 192.168.1.100
port: 11111
- host: 192.168.1.101
port: 11111
- host: 192.168.1.102
port: 11111
destination: example
# 其他配置同上...
4.2.2 连接工厂与配置类
创建Canal连接工厂和配置类,用于管理Canal客户端连接:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
import java.util.List;
@Configuration
@EnableConfigurationProperties(CanalProperties.class)
public class CanalConfig {
@Autowired
private CanalProperties canalProperties;
@Bean
@ConditionalOnMissingBean
public CanalConnector canalConnector() {
CanalConnector connector;
// 判断是否为集群配置
if (canalProperties.getServer().getCluster() != null &&
!canalProperties.getServer().getCluster().isEmpty()) {
List<InetSocketAddress> addresses = canalProperties.getServer().getCluster().stream()
.map(server -> new InetSocketAddress(server.getHost(), server.getPort()))
.toList();
connector = CanalConnectors.newClusterConnector(addresses,
canalProperties.getServer().getDestination(),
canalProperties.getUsername(),
canalProperties.getPassword());
} else {
connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalProperties.getServer().getHost(),
canalProperties.getServer().getPort()),
canalProperties.getServer().getDestination(),
canalProperties.getUsername(),
canalProperties.getPassword());
}
// 配置连接池参数
connector.setConnectTimeout(canalProperties.getPool().getTimeout());
connector.setSoTimeout(canalProperties.getPool().getTimeout());
return connector;
}
}
创建CanalProperties配置类,用于读取配置文件:
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {
private String username;
private String password;
private CanalServerProperties server;
private CanalPoolProperties pool;
private String subscribe;
private String filterRegex;
private String blackRegex;
// Getters and Setters
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
public CanalServerProperties getServer() { return server; }
public void setServer(CanalServerProperties server) { this.server = server; }
public CanalPoolProperties getPool() { return pool; }
public void setPool(CanalPoolProperties pool) { this.pool = pool; }
public String getSubscribe() { return subscribe; }
public void setSubscribe(String subscribe) { this.subscribe = subscribe; }
public String getFilterRegex() { return filterRegex; }
public void setFilterRegex(String filterRegex) { this.filterRegex = filterRegex; }
public String getBlackRegex() { return blackRegex; }
public void setBlackRegex(String blackRegex) { this.blackRegex = blackRegex; }
}
class CanalServerProperties {
private String host;
private int port;
private String destination;
private List<CanalClusterServerProperties> cluster;
// Getters and Setters
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
public String getDestination() { return destination; }
public void setDestination(String destination) { this.destination = destination; }
public List<CanalClusterServerProperties> getCluster() { return cluster; }
public void setCluster(List<CanalClusterServerProperties> cluster) { this.cluster = cluster; }
}
class CanalClusterServerProperties {
private String host;
private int port;
// Getters and Setters
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
}
class CanalPoolProperties {
private int maxActive;
private int maxIdle;
private int minIdle;
private int timeout;
// Getters and Setters
public int getMaxActive() { return maxActive; }
public void setMaxActive(int maxActive) { this.maxActive = maxActive; }
public int getMaxIdle() { return maxIdle; }
public void setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; }
public int getMinIdle() { return minIdle; }
public void setMinIdle(int minIdle) { this.minIdle = minIdle; }
public int getTimeout() { return timeout; }
public void setTimeout(int timeout) { this.timeout = timeout; }
}