springboot 使用 canal 监听 mysql binlog 实现数据同步

望舒的头像
望舒 最后修改于
标签:
canal数据同步mysqlbinlogspringboot

canal简介:

Canal 是一个开源的数据库改变数据捕获(CDC)解决方案,它可以用于捕获关系型数据库的变更事件并将其传输到目标位置。它常用于实时数据复制、数据集成、数据仓库加载等场景。

Canal 支持主流的关系型数据库,如 MySQL、Oracle、PostgreSQL 等,并通过解析数据库的日志实现数据捕获。它提供了简单易用的客户端接口、可靠的数据传输和高效的数据解析,使得用户可以方便地实时获取数据库的变更事件。

工作原理:

Canal Server:作为 Canal 的服务端组件,负责与数据库进行连接,并监听数据库的日志文件。

Canal Client:作为 Canal 的客户端组件,部署在需要捕获数据变更的机器上,通过与 Canal Server 建立连接获取数据库的变更事件。

数据解析:Canal Server 解析数据库的日志文件,将解析后的数据转换成可以理解的格式,如 JSON、Avro 等。

数据传输:Canal Server 将解析后的数据发送给对应的 Canal Client。

数据消费:Canal Client 接收数据并进行相应的处理,例如写入其他存储系统、进行实时计算、触发相应的业务逻辑等。

特点:

高性能:Canal 使用了高效的日志解析算法和异步网络传输,具有较低的延迟和较高的吞吐量。

多数据源支持:Canal 支持多种关系型数据库,并能够同时捕获多个数据源的变更事件。

灵活的部署模式:Canal 支持集中式部署和分布式部署两种模式,可以根据实际需求选择合适的部署方式。

数据过滤:Canal 支持通过配置过滤规则,选择性地捕获指定的表或字段的变更事件。

基于 Kafka 的消息传输:Canal 支持将解析后的数据使用 Kafka 进行消息传输,提供了更大的灵活性和扩展性。

canal安装和配置

这里使用的是canal.deployer-1.1.7.tar.gz,根据需求自行下载。github canal 官方下载

alt
Preview

canal.properties 配置

\canal.deployer-1.1.7\conf\canal.properties 配置可以不用动

这里的 example 是官方的一个示例,可以自己复制添加(注意改动配置文件)

\canal.deployer-1.1.7\conf\example\instance.properties 配置如下:

instance.properties 配置

instance.properties 基本上只用改动 dbUsername 和 dbPassword就可以了,其他的根据自身需求改动(由于这里使用的是1.1.17,所以 canal.instance.mysql.slaveId 也没有指定)

复制
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################

启动

启动 \logs\example\example.log 出现以下日志基本就是成功了,正常都不会有啥问题的

复制
2023-10-13 15:48:09.649 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2023-10-13 15:48:10.327 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2023-10-13 15:48:10.328 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2023-10-13 15:48:10.332 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

mysql 需要配置的地方

my.ini

需要改动 binlog-format 为 row 格式,其他的 statement 格式解析不了

复制
[mysqld]
binlog-format="row"

一些排查问题命令

复制
# 查看 bin_log 是否开启,ON 开启
show variables like 'log_bin';
# 查看 binlog 日志
mysqlbinlog DEMO-bin.000001 --base64-output=decode-rows -v
# 查看 master 节点 binlog 文件
show master status
# 重置
RESET MASTER;

springboot 相关配置

pom.xml 依赖

复制

<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.7</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.7</version>
</dependency>

application.yml

复制
canal:
  db-user: root
  db-password: root
  host: 127.0.0.1
  port: 11111

CanalProperties

复制
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@Data
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {

    private String dbUser;
    private String dbPassword;
    private String host;
    private Integer port = 11111;
    private String destination;

}

Demo 连接和使用示例

复制
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.stream.Collectors;

@Component
@Slf4j
public class Demo implements CommandLineRunner {

    @Resource
    CanalProperties canalProperties;

    @Override
    public void run(String... args) throws Exception {
        Thread canalThread = new Thread(() -> {
            log.info("连接canal并开始同步: test.user");
            CanalConnector connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress(
                            canalProperties.getHost(),
                            canalProperties.getPort()
                    ),
                    "example",
                    canalProperties.getDbUser(),
                    canalProperties.getDbPassword()
            );
            try {
                connector.connect();
//                这里订阅的参数会覆盖掉 canal 服务 example 中的 instance.properties 的 canal.instance.filter.regex 配置
//                这里要注意官方的文档对于这项配置的描述貌似有一些歧义,很容易配错
                connector.subscribe("test.user");
                connector.rollback();
                while (true) {
                    Message message = connector.getWithoutAck(100);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId != -1 && size > 0) {
                        try {
                            for (CanalEntry.Entry entry : message.getEntries()) {
                                this.handle(entry);
                            }
                            connector.ack(batchId);
                        } catch (Exception e) {
                            connector.rollback(batchId);
//                处理异常
                            log.error("异常,回滚...", e);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("连接canal出现异常", e);
            } finally {
                connector.disconnect();
            }
        });
        canalThread.setName("canal-thread-0");
        canalThread.start();
    }

    public void handle(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
        if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {
            String tableName = entry.getHeader().getTableName();
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                CanalEntry.EventType eventType = rowChange.getEventType();
                switch (eventType) {
                    case INSERT -> {
                        Map<String, Object> afterParam = rowData.getAfterColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
//                处理业务
                        log.info("新增: {}", afterParam);
                    }
                    case DELETE -> {
                        Map<String, Object> beforeParam = rowData.getBeforeColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
//                处理业务
                        log.info("删除: {}", beforeParam);
                    }
                    case UPDATE -> {
                        Map<String, Object> beforeParam = rowData.getBeforeColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
                        Map<String, Object> afterParam = rowData.getAfterColumnsList().stream().collect(Collectors.toMap(CanalEntry.Column::getName, CanalEntry.Column::getValue));
//                处理业务
                        log.info("更新,更新前: {},更新后: {}", beforeParam, afterParam);
                    }
                    default -> {
                        log.info("未匹配的类型,忽略: {}", eventType.name());
                    }
                }
            }
        }
    }

}

其他

我这里是原生的使用方法,其他可以参考 canal-spring-boot-starter 相关实现(好像不是官方的,github 上显示最后更新于两年前)

复制
<dependency>
     <groupId>top.javatool</groupId>
     <artifactId>canal-spring-boot-starter</artifactId>
     <version>1.2.1-RELEASE</version>
</dependency>
‌‌‌‌‌​‌‌‌‌​‌‌‌‌‌‌​​‌​‌‌‌‌​‌​​‌​‌‌‌‌​‌​​‌‌‌‌​‌​‌‌​‌​​​‌‌​‌​​‌​‌​‌‌​‌‌‌​‌‌​‌‌‌‌‌‌‌‌​‌‌‌‌​‌​‌‌​‌‌‌‌‌​‌‌‌‌​‌​‌‌​‌‌‌‌‌​‌‌‌‌​‌​‌‌‌‌‌‌‌‌​‌‌‌‌​‌​‌​‌‌​‌‌‌‌‌‌‌‌‌‌​‌​‌‌‌‌‌‌‌‌‌‌​‌‌​‌​​‌​‌‌‌‌‌‌‌​‌‌​‌​​‌​‌‌‌​‌‌‌​​‌​‌​‌‌‌‌‌‌​‌‌‌​‌‌​‌‌​‌‌‌‌‌​‌‌‌​‌‌​‌​​‌​‌‌‌​‌‌‌​​‌​‌​​‌​‌‌‌‌‌‌‌​‌‌​‌​​‌‌‌‌‌​‌‌‌‌‌‌​‌‌‌‌‌‌‌‌​‌‌‌​‌‌​‌​​‌‌‌‌‌‌‌‌‌​‌‌​‌‌​‌​‌‌‌‌‌‌‌​‌‌​‌‌​‌​‌‌‌​‌‌‌‌​‌​‌​‌‌‌‌‌‌​‌‌‌‌​‌​‌‌‌‌‌‌‌‌​‌‌‌‌​‌​‌​​‌‌‌‌‌‌‌‌‌‌​‌​‌​‌‌​‌‌‌‌‌‌‌‌​‌​‌​‌‌​‌‌‌‌‌‌‌‌​‌​‌‌‌‌​‌‌‌​‌‌‌​​‌​‌‌​‌‌‌‌‌‌‌‌‌​‌‌​‌​​‌‌‌‌‌​‌‌‌​​‌​‌​‌‌​‌‌‌​‌‌‌​‌‌​‌​​‌​‌‌‌​‌‌‌​‌‌​‌‌​‌​‌‌‌‌‌‌‌​‌‌​‌​​‌​‌‌‌‌​‌​‌‌​‌‌​‌​​​‌‌‌​​‌‌​‌‌‌​‌‌‌‌‌‌‌‌‌‌​‌‌‌‌​‌‌‌‌‌​​‌​​‌‌‌​‌​​‌‌‌​‌‌​‌‌​​​​‌‌​​​​​​‌​​​‌‌​‌​​‌​‌​‌‌​‌‌‌​‌‌​‌‌‌‌‌‌‌‌​‌‌‌‌​‌​‌‌​‌‌‌‌‌​‌‌‌‌​‌​‌‌​‌‌‌‌‌​‌‌‌‌​‌​‌‌‌‌‌‌‌‌​‌‌‌‌​‌​‌​‌‌​‌‌‌‌‌‌‌‌‌‌​‌​‌‌‌‌‌‌‌‌‌‌​‌‌​‌​​‌​‌‌‌‌‌‌‌​‌‌​‌​​‌​‌‌‌​‌‌‌​​‌​‌​‌‌‌‌‌‌​‌‌‌​‌‌​‌‌​‌‌‌‌‌​‌‌‌​‌‌​‌​​‌​‌‌‌​‌‌‌​​‌​‌​​‌​‌‌‌‌‌‌‌​‌‌​‌​​‌‌‌‌‌​‌‌‌‌‌‌​‌‌‌‌‌‌‌‌​‌‌‌​‌‌​‌​​‌‌‌‌‌‌‌‌‌​‌‌​‌‌​‌​‌‌‌‌‌‌‌​‌‌​‌‌​‌​‌‌‌​‌‌‌‌​‌​‌​‌‌‌‌‌‌​‌‌‌‌​‌​‌‌‌‌‌‌‌‌​‌‌‌‌​‌​‌​​‌‌‌‌‌‌‌‌‌‌​‌​‌​‌‌​‌‌‌‌‌‌‌‌​‌​‌​‌‌​‌‌‌‌‌‌‌‌​‌​‌‌‌‌​‌‌‌​‌‌‌​​‌​‌‌​‌‌‌‌‌‌‌‌‌​‌‌​‌​​‌‌‌‌‌​‌‌‌​​‌​‌​‌‌​‌‌‌​‌‌‌​‌‌​‌​​‌​‌‌‌​‌‌‌​‌‌​‌‌​‌​‌‌‌‌‌‌‌​‌‌​‌​​‌​‌‌‌‌​‌​‌‌​‌‌​‌​​​‌‌‌​​‌‌​‌‌‌​‌‌‌‌‌‌‌‌‌‌​‌‌​‌​​‌​‌‌‌​‌‌‌​​‌​‌​‌‌‌‌‌‌​‌‌‌​‌‌​‌‌​‌‌‌‌‌​‌‌‌​‌‌​‌​​‌​‌‌‌​‌‌‌​​‌​‌​​‌​‌‌‌‌‌‌‌​‌‌​‌​​‌​‌‌‌‌‌‌‌‌​‌​‌​‌‌‌‌‌‌‌‌‌‌‌​‌​‌‌​‌‌‌‌‌‌‌‌‌​‌‌‌‌​‌‌‌‌‌‌​​​​​‌‌​​‌​​‌‌‌‌​​​​‌‌‌​​‌​​​‌‌‌​‌‌​​‌​​​‌‌‌‌​​‌‌‌‌‌​‌​‌‌​​​​​‌‌​​‌​‌‌‌‌​‌​​​​​​​‌‌​‌‌​‌‌‌‌​​​‌‌‌‌‌​​​​‌‌​​​​​​​​​‌‌​​​​​‌‌‌‌​​​​‌‌‌‌‌‌​‌​‌​‌​​​‌‌‌‌​​‌‌​‌​​‌​‌​‌​​​​‌​‌​‌​​‌​‌​‌‌‌​​‌‌‌​​‌‌‌​‌​​‌​‌​‌‌‌‌‌​​​‌​‌​​‌​‌‌‌‌​‌​​‌​‌‌‌‌​‌​​‌‌‌‌​‌​‌‌​‌​‌‌‌‌‌‌‌​​​​​​​‌​‌‌‌‌‌‌‌​‌‌​​‌‌‌‌‌‌​‌‌​‌​​‌‌‌‌‌‌‌‌‌​‌‌​‌​​‌‌‌‌‌‌‌‌‌​‌‌​‌​​‌‌‌‌‌‌‌‌‌​‌‌‌‌​‌‌‌‌‌
1
1
251
No data