Canal集成指南

1. 引言与Canal基础概述

本文详细介绍Canal在Spring Boot环境下与MySQL数据同步的完整集成流程,包括环境准备、Canal部署配置以及Spring Boot客户端开发。

1.1 Canal简介与核心原理

Canal是阿里巴巴开源的一款基于MySQL数据库增量日志解析的高性能数据同步系统,其英文含义为"管道"或"沟渠",形象地表达了其在数据传输中的作用。

核心工作原理

  1. Canal模拟MySQL slave的身份向master发送dump请求
  2. MySQL master收到请求后开始推送binary log给Canal
  3. Canal解析接收到的二进制日志对象,转换为结构化的数据变更事件

支持的MySQL版本

5.1.x 5.5.x 5.6.x 5.7.x 8.0.x

典型应用场景

数据库镜像
数据库实时备份
索引构建和实时维护
业务缓存刷新
增量数据处理

1.2 Canal架构组件与工作机制

Canal的整体架构采用Server-Instance模式设计,主要由Canal Server和Canal Instance两个核心组件构成。

Canal Server

代表一个Canal运行实例,对应一个JVM进程。负责管理和运行整个Canal服务,处理数据源的接入、数据解析、过滤和存储等任务。支持集群化部署,通过Zookeeper管理集群状态。

Canal Instance

对应一个数据队列,负责从特定数据源读取变更数据并推送到下游消费者。每个Instance独立运行,管理特定的数据库连接和数据流。

Canal Instance核心组件

组件名称 主要功能
EventParser 数据源接入,模拟主从协议交互,解析变更数据
EventSink 数据过滤、加工和分发,连接Parser和Store
EventStore 暂存"尚未消费"的events,默认基于内存的阻塞队列
MetaManager 管理增量订阅和消费信息,维护Instance状态

1.3 技术选型考量与优势分析

数据同步工具对比

Canal核心优势

实时性高

基于binlog的增量同步,毫秒级同步,延迟控制在1秒内,不侵入业务代码

性能卓越

1.1.x版本性能提升150%,原生支持Prometheus监控、Kafka消息投递

架构灵活

支持单机、集群、分布式多种部署模式,支持动态扩展与故障自动转移

生态完善

支持Java、C#、Go、PHP、Python等多种编程语言客户端实现

使用便捷

与Spring Boot集成简单,引入依赖即可快速实现数据同步功能

2. 部署环境准备与MySQL配置

本节介绍Canal部署所需的系统环境要求、MySQL配置调整以及网络安全设置。

2.1 系统环境要求与软件依赖

基础环境要求

  • 操作系统:Windows、Linux、MacOS(推荐Linux)
  • Java环境:JDK 1.8或以上版本
  • MySQL版本:5.1.x、5.5.x、5.6.x、5.7.x、8.0.x

推荐硬件配置

  • CPU:至少4核,推荐8核或以上
  • 内存:至少8GB,推荐16GB或以上
  • 磁盘:至少100GB可用空间,推荐SSD

网络环境要求

  • Canal服务器与MySQL服务器网络连接稳定,延迟建议小于1ms
  • 带宽满足数据传输需求,根据实际数据量调整
  • 建议启用NTP时间同步,确保服务器时间一致性

2.2 MySQL配置调整与优化

2.2.1 开启binlog日志功能

编辑MySQL配置文件(my.cnf或my.ini),在[mysqld]区块下添加以下配置:

[mysqld]
log-bin=mysql-bin  # 开启binlog功能
binlog-format=ROW  # 设置binlog格式为ROW模式
server_id=1        # 配置MySQL replication需要的server ID,不要与Canal的slaveId重复

重启MySQL服务后,使用以下命令检查binlog是否已正确开启:

SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'server_id';

2.2.2 设置binlog相关参数

添加以下配置以优化性能和安全性:

[mysqld]
expire_logs_days=7          # 设置binlog过期时间为7天
binlog_row_image=FULL        # 记录完整的行数据
binlog-ignore-db=mysql      # 忽略mysql系统库的binlog记录
lower_case_table_names=1     # 表名存储为小写,比较时不区分大小写
default_authentication_plugin=mysql_native_password  # 使用传统的密码认证方式

2.2.3 创建Canal专用用户并授权

为Canal创建专用的数据库用户,并授予必要的权限:

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal123';  # 创建canal用户,密码为canal123
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  # 授予必要的权限
FLUSH PRIVILEGES;  # 刷新权限
注意事项

如果MySQL启用了密码策略,密码设置不能过于简单,否则会出现密码策略错误。此时需要设置更复杂的密码或调整MySQL的密码策略配置。

2.3 网络环境与安全配置

端口开放要求

服务/功能 默认端口 用途说明
MySQL数据库 3306 Canal服务器访问MySQL的端口
Canal Server 11111 Canal Server提供socket服务的端口
Canal Admin 11110 Canal管理控制台端口(如果启用)
Canal metrics 11112 Prometheus监控指标拉取端口

Linux防火墙配置示例

# 使用firewalld开放端口
firewall-cmd --zone=public --add-port=11111/tcp --permanent
firewall-cmd --zone=public --add-port=11110/tcp --permanent
firewall-cmd --zone=public --add-port=11112/tcp --permanent
firewall-cmd --reload

# 查看已开放的端口
firewall-cmd --list-ports

安全加固建议

  • 将Canal部署在独立的服务器上,与数据库服务器分离
  • 使用专用的数据库用户,避免使用root等超级用户
  • 限制Canal服务器的网络访问权限,只允许必要的连接
  • 定期审查数据库访问日志,监控异常访问行为
  • 启用NTP时间同步,确保服务器时间一致性

3. Canal Server部署与配置

本节详细介绍Canal Server的下载安装、配置文件详解、多instance配置以及集群部署方案。

3.1 下载安装与目录结构

3.1.1 版本选择与下载

Canal的最新稳定版本是1.1.8(发布于2025年1月16日),推荐从官方GitHub发布页面下载:

使用wget命令下载最新版本(以Linux为例):

wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz

3.1.2 解压安装与目录说明

Linux系统安装步骤
# 创建安装目录
mkdir /opt/canal
cd /opt/canal

# 解压安装包
tar zxvf /path/to/canal.deployer-1.1.8.tar.gz

# 进入Canal目录
cd canal.deployer-1.1.8
Windows系统安装步骤
  1. 创建安装目录(例如D:\canal)
  2. 解压下载的ZIP文件到该目录
  3. 通过命令提示符或PowerShell进入安装目录
Canal目录结构
canal.deployer-1.1.8/
├── bin/                  # 启动脚本目录
├── conf/                 # 配置文件目录
│   ├── canal.properties   # 全局配置文件
│   ├── example/          # 示例instance配置目录
│   │   └── instance.properties
│   └── logback.xml       # 日志配置文件
├── lib/                  # 依赖库目录
├── logs/                 # 日志文件目录
└── plugin/               # 插件目录

3.2 核心配置文件详解

3.2.1 canal.properties全局配置

canal.properties是Canal的全局配置文件,包含了Canal Server的基本配置信息。以下是主要配置参数的说明:

# Canal基本配置
canal.id=1                   # 每个Canal Server实例的唯一标识
canal.ip=                   # Canal Server绑定的本地IP地址,如果不配置,默认选择一个本机IP
canal.port=11111            # Canal Server提供socket服务的端口
canal.metrics.pull.port=11112 # 监控指标拉取端口,用于Prometheus监控
canal.zkServers=            # Canal Server连接Zookeeper集群的地址(集群模式需要)

# Canal运行模式配置
canal.serverMode=tcp         # 可选值:tcp, kafka, RocketMQ
canal.withoutNetty=false     # 是否禁用Netty,一般不需要修改

# Canal Admin配置(如果启用Admin功能)
canal.admin.port=11110       # Canal Admin端口
canal.admin.user=admin       # 管理员用户名
canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441  # 管理员密码(SHA-1加密)

# 实例配置
canal.destinations=example   # 指定实例名,多个实例用逗号分隔
canal.instance.global.spring.xml=classpath:spring/default-instance.xml

# 内存配置
canal.instance.memory.buffer.size=16384    # 内存缓冲区大小,必须是2的幂
canal.instance.memory.buffer.memunit=1024  # 内存单元大小,默认1KB
canal.instance.memory.batch.mode=MEMSIZE   # 内存批次模式:MEMSIZE或ITEMSIZE
重要提示

canal.destinations参数指定了要启动的instance名称,多个instance用逗号分隔。例如,如果要监控两个数据库,可以配置为canal.destinations=db1,db2,然后在conf目录下创建db1和db2两个目录,并分别配置各自的instance.properties文件。

3.2.2 instance.properties实例配置

instance.properties是每个instance的具体配置文件,包含了连接MySQL数据库和数据解析的相关配置:

# 基本配置
canal.instance.mysql.slaveId=1234            # 必须配置,不能与MySQL的server_id重复
canal.instance.master.address=127.0.0.1:3306  # MySQL主库地址和端口
canal.instance.dbUsername=canal              # 数据库用户名
canal.instance.dbPassword=canal              # 数据库密码
canal.instance.defaultDatabaseName=          # 默认数据库名
canal.instance.connectionCharset=UTF-8       # 连接字符集

# 过滤配置
canal.instance.filter.regex=.*\\..*          # 表过滤正则表达式
canal.instance.filter.black.regex=           # 黑名单过滤正则表达式

# 解析配置
canal.instance.parser.parallel=false         # 是否开启并行解析
canal.instance.parser.parallelThreadSize=16  # 并行解析线程数(仅当parallel=true时有效)
canal.instance.parser.direct=false           # 是否直接解析(不建议修改)
表过滤规则说明
过滤规则 说明
.*\\..* 匹配所有库的所有表
test\\..* 匹配test库的所有表
test\\.user 匹配test库的user表
test\\..*,demo\\..* 匹配test库和demo库的所有表
test\\.user,test\\.order 匹配test库的user表和order表
注意事项

如果在配置文件中配置了过滤规则,就不应该在代码中调用CanalConnector.subscribe()方法,否则配置文件中的过滤配置会被覆盖。

3.3 多instance配置与集群部署

3.3.1 配置多个数据源

如果需要监控多个MySQL数据库或同一个数据库的不同部分,可以通过配置多个instance来实现:

  1. 修改全局配置文件

    编辑canal.properties,在canal.destinations参数中指定多个instance名称,用逗号分隔:

    canal.destinations=db1,db2,db3
  2. 创建instance目录

    在conf目录下创建与instance名称对应的目录,例如db1、db2、db3。

  3. 配置instance文件

    在每个instance目录下创建instance.properties文件,并配置相应的数据库连接信息和过滤规则。

db1实例配置(监控test数据库的user表):

canal.instance.mysql.slaveId=1001
canal.instance.master.address=192.168.1.100:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=test
canal.instance.filter.regex=test\\.user

db2实例配置(监控demo数据库的所有表):

canal.instance.mysql.slaveId=1002
canal.instance.master.address=192.168.1.101:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=demo
canal.instance.filter.regex=demo\\..*

3.3.2 基于Zookeeper的高可用集群

Canal支持基于Zookeeper的高可用集群部署,通过Zookeeper实现主备切换和负载均衡。

集群部署架构
Canal A (Active)
Canal B (Standby)
Zookeeper Cluster (3 nodes)
集群部署关键步骤
  1. 部署Zookeeper集群

    • 部署至少3个Zookeeper节点
    • 确保Zookeeper版本在3.4.6以上
    • 配置Zookeeper集群间的通信
  2. 配置Canal Server

    在每个Canal Server的canal.properties中添加Zookeeper配置:

    canal.zkServers=zk1:2181,zk2:2181,zk3:2181
    canal.serverMode=kafka  # 建议使用消息队列模式
  3. 配置instance

    在instance配置文件中添加集群相关配置:

    canal.instance.global.spring.xml=classpath:spring/zk-instance.xml
  4. 启动集群

    • 依次启动所有Canal Server节点
    • 通过Zookeeper选举机制,只有一个节点会成为Active状态
    • 其他节点处于Standby状态,等待Active节点故障时接管

3.3.3 负载均衡与故障转移

负载均衡策略
轮询策略
canal.instance.loadbalance.strategy=round_robin

均匀分配instance到各个节点,适合数据量分布均匀的场景

权重策略
canal.instance.loadbalance.strategy=weighted
canal.instance.loadbalance.weight=10

根据权重分配instance,可根据节点性能设置不同权重

一致性哈希策略
canal.instance.loadbalance.strategy=consistent_hashing

根据表名哈希分配到固定节点,适合相同表数据在同一节点处理

故障转移机制
  1. 当Active节点故障时,Zookeeper会检测到节点消失
  2. Zookeeper触发选举机制,从Standby节点中选出新的Active节点
  3. 新的Active节点接管故障节点的所有instance
  4. 客户端会自动重连到新的Active节点
  5. 整个故障转移过程通常在几秒内完成

3.4 启动运行与状态检查

3.4.1 启动命令与参数

Linux系统命令
# 启动Canal服务
sh bin/startup.sh

# 停止Canal服务
sh bin/stop.sh

# 重启Canal服务
sh bin/restart.sh

# 查看服务状态
sh bin/status.sh
Windows系统命令
# 启动Canal服务
bin\startup.bat

# 停止Canal服务
bin\stop.bat

# 查看服务状态
bin\status.bat

3.4.2 日志文件与监控

Canal的日志文件存放在logs目录下,主要包括:

日志文件 用途说明
canal.log Canal Server的主要日志文件
example.log example实例的日志文件(根据instance名称命名)
canal_gc.log JVM垃圾回收日志
canal_stderr.log 标准错误输出日志

查看实时日志:

tail -f logs/canal.log
tail -f logs/example.log
内存配置调整

如果启动过程中出现内存不足的错误,可以通过修改bin/startup.sh文件中的JVM参数来调整内存配置:

JAVA_OPTS="-server -Xms512m -Xmx512m -Xmn256m -XX:SurvivorRatio=2"

3.4.3 常见启动问题处理

端口占用问题

如果启动时报错端口已被占用,可以通过以下命令查看占用端口的进程:

lsof -i :11111

然后杀死对应的进程或修改Canal的端口配置。

连接数据库失败
  • 检查MySQL服务是否正常运行
  • 检查instance.properties中的数据库连接信息是否正确
  • 检查网络连接是否正常
  • 检查MySQL用户权限是否正确
binlog解析错误
  • 确保MySQL的binlog格式为ROW模式
  • 检查MySQL的server_id与Canal的slaveId是否重复
  • 检查binlog是否已开启

4. Spring Boot集成开发指南

本节详细介绍如何在Spring Boot项目中集成Canal客户端,实现MySQL数据同步功能。

4.1 项目依赖配置

4.1.1 Maven依赖配置

Spring Boot 2.x版本依赖
<dependencies>
    <!-- Spring Boot Web Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Canal客户端依赖 -->
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.7</version>
        <exclusions>
            <!-- 排除冲突的日志依赖 -->
            <exclusion>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <!-- Canal协议依赖(可选) -->
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.1.7</version>
    </dependency>
</dependencies>
Spring Boot 3.x版本依赖
<dependencies>
    <!-- Spring Boot Web Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Canal客户端依赖 -->
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.7</version>
        <exclusions>
            <!-- 排除冲突的日志依赖 -->
            <exclusion>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    
    <!-- Spring Boot 3.x额外依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
</dependencies>
依赖冲突注意事项

Canal客户端的日志依赖可能与Spring Boot的默认日志框架产生冲突,因此需要通过<exclusions>标签排除冲突的日志依赖,避免启动时出现日志相关的异常。

4.1.2 版本兼容性说明

组件 版本要求 备注
Spring Boot 2.x Canal 1.1.7+ 推荐2.3.x以上版本
Spring Boot 3.x Canal 1.1.7+ 需要额外添加log4j2依赖
Canal 最低1.1.4,推荐1.1.7+ 1.1.4+支持Admin动态运维
JDK 2.x需JDK 8+,3.x需JDK 17+ 遵循Spring Boot官方JDK要求

4.2 Canal客户端配置与连接

4.2.1 配置文件设置

在Spring Boot项目中,推荐使用application.yml文件配置Canal连接信息:

canal:
  # Canal Server连接配置
  server:
    host: 127.0.0.1      # Canal Server主机地址
    port: 11111          # Canal Server端口
    destination: example  # instance名称
    
  # 认证信息(如果启用了认证)
  username:             # 用户名(默认为空)
  password:             # 密码(默认为空)
  
  # 连接池配置
  pool:
    max-active: 10       # 最大活跃连接数
    max-idle: 5          # 最大空闲连接数
    min-idle: 1          # 最小空闲连接数
    timeout: 30000       # 连接超时时间(毫秒)
    
  # 订阅配置
  subscribe: ".*\\..*"   # 订阅所有表的变更事件
  filter-regex:          # 表过滤正则表达式(可选)
  black-regex:           # 黑名单正则表达式(可选)

集群模式配置(多个Canal Server):

canal:
  server:
    cluster:
      - host: 192.168.1.100
        port: 11111
      - host: 192.168.1.101
        port: 11111
      - host: 192.168.1.102
        port: 11111
    destination: example
  # 其他配置同上...

4.2.2 连接工厂与配置类

创建Canal连接工厂和配置类,用于管理Canal客户端连接:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;
import java.util.List;

@Configuration
@EnableConfigurationProperties(CanalProperties.class)
public class CanalConfig {

    @Autowired
    private CanalProperties canalProperties;

    @Bean
    @ConditionalOnMissingBean
    public CanalConnector canalConnector() {
        CanalConnector connector;
        
        // 判断是否为集群配置
        if (canalProperties.getServer().getCluster() != null && 
            !canalProperties.getServer().getCluster().isEmpty()) {
            List<InetSocketAddress> addresses = canalProperties.getServer().getCluster().stream()
                    .map(server -> new InetSocketAddress(server.getHost(), server.getPort()))
                    .toList();
            connector = CanalConnectors.newClusterConnector(addresses, 
                    canalProperties.getServer().getDestination(),
                    canalProperties.getUsername(),
                    canalProperties.getPassword());
        } else {
            connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress(canalProperties.getServer().getHost(), 
                                         canalProperties.getServer().getPort()),
                    canalProperties.getServer().getDestination(),
                    canalProperties.getUsername(),
                    canalProperties.getPassword());
        }
        
        // 配置连接池参数
        connector.setConnectTimeout(canalProperties.getPool().getTimeout());
        connector.setSoTimeout(canalProperties.getPool().getTimeout());
        
        return connector;
    }
}

创建CanalProperties配置类,用于读取配置文件:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {
    private String username;
    private String password;
    private CanalServerProperties server;
    private CanalPoolProperties pool;
    private String subscribe;
    private String filterRegex;
    private String blackRegex;

    // Getters and Setters
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    public String getPassword() { return password; }
    public void setPassword(String password) { this.password = password; }
    public CanalServerProperties getServer() { return server; }
    public void setServer(CanalServerProperties server) { this.server = server; }
    public CanalPoolProperties getPool() { return pool; }
    public void setPool(CanalPoolProperties pool) { this.pool = pool; }
    public String getSubscribe() { return subscribe; }
    public void setSubscribe(String subscribe) { this.subscribe = subscribe; }
    public String getFilterRegex() { return filterRegex; }
    public void setFilterRegex(String filterRegex) { this.filterRegex = filterRegex; }
    public String getBlackRegex() { return blackRegex; }
    public void setBlackRegex(String blackRegex) { this.blackRegex = blackRegex; }
}

class CanalServerProperties {
    private String host;
    private int port;
    private String destination;
    private List<CanalClusterServerProperties> cluster;

    // Getters and Setters
    public String getHost() { return host; }
    public void setHost(String host) { this.host = host; }
    public int getPort() { return port; }
    public void setPort(int port) { this.port = port; }
    public String getDestination() { return destination; }
    public void setDestination(String destination) { this.destination = destination; }
    public List<CanalClusterServerProperties> getCluster() { return cluster; }
    public void setCluster(List<CanalClusterServerProperties> cluster) { this.cluster = cluster; }
}

class CanalClusterServerProperties {
    private String host;
    private int port;

    // Getters and Setters
    public String getHost() { return host; }
    public void setHost(String host) { this.host = host; }
    public int getPort() { return port; }
    public void setPort(int port) { this.port = port; }
}

class CanalPoolProperties {
    private int maxActive;
    private int maxIdle;
    private int minIdle;
    private int timeout;

    // Getters and Setters
    public int getMaxActive() { return maxActive; }
    public void setMaxActive(int maxActive) { this.maxActive = maxActive; }
    public int getMaxIdle() { return maxIdle; }
    public void setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; }
    public int getMinIdle() { return minIdle; }
    public void setMinIdle(int minIdle) { this.minIdle = minIdle; }
    public int getTimeout() { return timeout; }
    public void setTimeout(int timeout) { this.timeout = timeout; }
}