zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

SpringBoot系列之集成阿里canal监听MySQL Binlog

mysqlSpringBoot集成阿里 系列 监听 Binlog Canal
2023-09-27 14:29:10 时间

1、什么是阿里canal?
canal是阿里开源的,对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向master发送dump 协议,获取到数据后,解析 binary log 对象数据。

 

 

2、canal环境搭建
本文基于Window系统。

使用canal需要确保数据库开启了binlog:

show variables like'log_%';

如果没开启,在mysql my.ini配置文件添加配置,注意文件内存为的时候,注意编码格式必须为ANSI,不然会编译报错

[mysqld]
# 开启 binlog
log-bin=mysql-bin 
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1

 

配置文件修改是否正确,使用命令,查看日志

mysqld --console

 

重启MySQL实例

net stop mysql
net start mysql

 

binlog开启后,创建一个canal用户并授权,官网配置是@%,表示所有服务器,因为本地测试的,所以改为localhost就可以

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' identified by 'canal';
FLUSH PRIVILEGES;

 

下载canal服务端,到官网releases下载对应资料,canal.deployer-1.1.5.tar.gz是服务端,解压后在conf文件夹里找到\example\instance.properties,修改数据库配置信息,dbUsername,dbPassword数据库账号密码

# position info(master数据库配置)
canal.instance.master.address=gz-cdb-l5ixwzm1.sql.tencentcdb.com:59039
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password(用户名密码)
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal1uo#A#9R
# 加上默认数据库
canal.instance.defaultDatabaseName=tajiax_canal 
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

 

查看canal实例名称,文件在D:\dev\canal.deployer-1.1.5\conf\canal.properties:

canal.destination=example

到canal服务器安装目录D:\dev\canal.deployer-1.1.5\bin,找到startup.bat执行。

在码云有示例->\ly\canal-application

3、Canal客户端测试
JDK 1.8
SpringBoot2.2.1
Maven 3.2+
开发工具
IntelliJ IDEA
smartGit

3.1、引入依赖

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

 

3.2、配置canal
# canal实例名称,要跟canal-server运行时设置的destination一致

# D:\dev\canal.deployer-1.1.5\conf\canal.properties
canal.destination=example
# canal默认监听端口
canal.server=127.0.0.1:11111
logging.level.top.javatool.canal.client=warn

 

3.3、编写监听器,监听Canal消息

package com.lynch.entity;

import org.springframework.stereotype.Component;

import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable("tb_user")
@Component
public class UserHandler implements EntryHandler<UserEntity> {

//    @Autowired
//    private RedisHandler redisHandler;
//    @Autowired
//    private Cache<Long, Item> itemCache;

    @Override
    public void insert(UserEntity item) {
        System.out.println("insert," + item);
        // 写数据到JVM进程缓存
        //itemCache.put(item.getId(), item);
        // 写数据到redis
        //redisHandler.saveItem(item);
    }

    @Override
    public void update(UserEntity before, UserEntity after) {
        System.out.println("update before," + before);
        System.out.println("update after," + after);
        // 写数据到JVM进程缓存
        //itemCache.put(after.getId(), after);
        // 写数据到redis
        //redisHandler.saveItem(after);
    }

    @Override
    public void delete(UserEntity item) {
        System.out.println("delete," + item);
        // 删除数据到JVM进程缓存
        //itemCache.invalidate(item.getId());
        // 删除数据到redis
        //redisHandler.deleteItemById(item.getId());
    }
}

当表tb_user进行insert、update、delete操作时,会发现监控已生效。

注意:更改canal实例名称为canaltest
1、在D:\dev\canal.deployer-1.1.5\conf目录下,把example文件夹改成canaltest
2、在D:\dev\canal.deployer-1.1.5\conf\canal.properties文件中,找到canal.destination配置项改为canaltest
3、修改canal配置
canal.destination=canaltest