springboot 使用 canal 监听 mysql binlog 实现数据同步
canal简介:
Canal 是一个开源的数据库改变数据捕获(CDC)解决方案,它可以用于捕获关系型数据库的变更事件并将其传输到目标位置。它常用于实时数据复制、数据集成、数据仓库加载等场景。
Canal 支持主流的关系型数据库,如 MySQL、Oracle、PostgreSQL 等,并通过解析数据库的日志实现数据捕获。它提供了简单易用的客户端接口、可靠的数据传输和高效的数据解析,使得用户可以方便地实时获取数据库的变更事件。
工作原理:
Canal Server:作为 Canal 的服务端组件,负责与数据库进行连接,并监听数据库的日志文件。
Canal Client:作为 Canal 的客户端组件,部署在需要捕获数据变更的机器上,通过与 Canal Server 建立连接获取数据库的变更事件。
数据解析:Canal Server 解析数据库的日志文件,将解析后的数据转换成可以理解的格式,如 JSON、Avro 等。
数据传输:Canal Server 将解析后的数据发送给对应的 Canal Client。
数据消费:Canal Client 接收数据并进行相应的处理,例如写入其他存储系统、进行实时计算、触发相应的业务逻辑等。
特点:
高性能:Canal 使用了高效的日志解析算法和异步网络传输,具有较低的延迟和较高的吞吐量。
多数据源支持:Canal 支持多种关系型数据库,并能够同时捕获多个数据源的变更事件。
灵活的部署模式:Canal 支持集中式部署和分布式部署两种模式,可以根据实际需求选择合适的部署方式。
数据过滤:Canal 支持通过配置过滤规则,选择性地捕获指定的表或字段的变更事件。
基于 Kafka 的消息传输:Canal 支持将解析后的数据使用 Kafka 进行消息传输,提供了更大的灵活性和扩展性。
canal安装和配置
这里使用的是canal.deployer-1.1.7.tar.gz,根据需求自行下载。github canal 官方下载

canal.properties 配置
\canal.deployer-1.1.7\conf\canal.properties 配置可以不用动
这里的 example 是官方的一个示例,可以自己复制添加(注意改动配置文件)
\canal.deployer-1.1.7\conf\example\instance.properties 配置如下:
instance.properties 配置
instance.properties 基本上只用改动 dbUsername 和 dbPassword就可以了,其他的根据自身需求改动(由于这里使用的是1.1.17,所以 canal.instance.mysql.slaveId 也没有指定)
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################
启动
启动 \logs\example\example.log 出现以下日志基本就是成功了,正常都不会有啥问题的
2023-10-13 15:48:09.649 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2023-10-13 15:48:10.327 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-10-13 15:48:10.328 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-10-13 15:48:10.332 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
mysql 需要配置的地方
my.ini
需要改动 binlog-format 为 row 格式,其他的 statement 格式解析不了
[mysqld]
binlog-format="row"
一些排查问题命令
# 查看 bin_log 是否开启,ON 开启
show variables like 'log_bin';
# 查看 binlog 日志
mysqlbinlog DEMO-bin.000001 --base64-output=decode-rows -v
# 查看 master 节点 binlog 文件
show master status
# 重置
RESET MASTER;
springboot 相关配置
pom.xml 依赖
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.7</version>
</dependency>
application.yml
canal:
db-user: root
db-password: root
host: 127.0.0.1
port: 11111
CanalProperties
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@Data
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {
private String dbUser;
private String dbPassword;
private String host;
private Integer port = 11111;
private String destination;
}
Demo 连接和使用示例
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.stream.Collectors;
@Component
@Slf4j
public class Demo implements CommandLineRunner {
@Resource
CanalProperties canalProperties;
@Override
public void run(String... args) throws Exception {
Thread canalThread = new Thread(() -> {
log.info("连接canal并开始同步: test.user");
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(
canalProperties.getHost(),
canalProperties.getPort()
),
"example",
canalProperties.getDbUser(),
canalProperties.getDbPassword()
);
try {
connector.connect();
// 这里订阅的参数会覆盖掉 canal 服务 example 中的 instance.properties 的 canal.instance.filter.regex 配置
// 这里要注意官方的文档对于这项配置的描述貌似有一些歧义,很容易配错
connector.subscribe("test.user");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId != -1 && size > 0) {
try {
for (CanalEntry.Entry entry : message.getEntries()) {
this.handle(entry);
}
connector.ack(batchId);
} catch (Exception e) {
connector.rollback(batchId);
// 处理异常
log.error("异常,回滚...", e);
}
}
}
} catch (Exception e) {
log.error("连接canal出现异常", e);
} finally {
connector.disconnect();
}
});
canalThread.setName("canal-thread-0");
canalThread.start();
}
public void handle(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {
String tableName = entry.getHeader().getTableName();
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
CanalEntry.EventType eventType = rowChange.getEventType();
switch (eventType) {
case INSERT -> {
Map<String, Object> afterParam = rowData.getAfterColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
// 处理业务
log.info("新增: {}", afterParam);
}
case DELETE -> {
Map<String, Object> beforeParam = rowData.getBeforeColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
// 处理业务
log.info("删除: {}", beforeParam);
}
case UPDATE -> {
Map<String, Object> beforeParam = rowData.getBeforeColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
Map<String, Object> afterParam = rowData.getAfterColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
// 处理业务
log.info("更新,更新前: {},更新后: {}", beforeParam, afterParam);
}
default -> {
log.info("未匹配的类型,忽略: {}", eventType.name());
}
}
}
}
}
}
其他
我这里是原生的使用方法,其他可以参考 canal-spring-boot-starter 相关实现(好像不是官方的,github 上显示最后更新于两年前)
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>