zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

微服务轮子项目(36) -Canal数据库日志解析消费

数据库项目日志服务 解析 消费 36 轮子
2023-09-11 14:15:40 时间

1. Canal概述

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

目前内部版本已经支持mysql和oracle部分版本的日志解析
当前的canal开源版本支持5.7及以下的版本(阿里内部mysql
5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

  • 数据库镜像
  • 数据库实时备份
  • 多级索引 (卖家和买家各自分库索引)
  • search build
  • 业务cache刷新
  • 价格变化等重要业务消息

1.1 工作原理

在这里插入图片描述

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

1.2 架构

在这里插入图片描述

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

1.3 HA机制设计

canal的HA分为两部分,canal servercanal client分别有对应的HA实现

  • canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
  • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。

Canal Server:
在这里插入图片描述
大致步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制。

1.4 相关资料

2. 安装部署

2.1 创建数据库用户canal

目标数据库先要创建好canal用的用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2.2 远程拉取

  1. 访问docker hub获取最新的版本 访问:https://hub.docker.com/r/canal/canal-server/tags/

  2. 下载对应的版本,比如最新版为1.1.3

docker pull canal/canal-server:v1.1.3

2.3 启动Docker

docker目录下自带了一个run.sh脚本

下载脚本:

wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 

创建启动脚本:

vim start.sh
docker stop canal-server
docker rm canal-server
sh run.sh -e canal.auto.scan=false \
              -e canal.destinations=zlt-test \
              -e canal.instance.master.address=192.168.28.130:3306  \
              -e canal.instance.dbUsername=canal  \
              -e canal.instance.dbPassword=canal  \
              -e canal.instance.connectionCharset=UTF-8 \
              -e canal.instance.tsdb.enable=true \
              -e canal.instance.gtidon=false  \

destinations:目标数据库名
instance.master.address:目标数据库地址
instance.dbUsername:目标数据库用户名
instance.dbPassword:目标数据库密码
具体其他配置可参考AdminGuide

docker模式下,单docker实例只能运行一个instance,主要为配置问题。如果需要运行多instance时,可以自行制作一份docker镜像即可

2.4 运行

sh start.sh

运行效果:看到successful之后,就代表canal-server启动成功,可以启动canal-client连接上来进行binlog订阅了
在这里插入图片描述

2.5 MQ消息投递

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:RocketMQ和Kafka

将上面的启动脚本改为以下,增加MQ相关参数:

docker stop canal-server
docker rm canal-server
sh run.sh -e canal.auto.scan=false \
                  -e canal.destinations=zlt-test \
                  -e canal.instance.master.address=192.168.28.130:3306  \
                  -e canal.instance.dbUsername=canal  \
                  -e canal.instance.dbPassword=canal  \
                  -e canal.instance.connectionCharset=UTF-8 \
                  -e canal.instance.tsdb.enable=true \
                  -e canal.instance.gtidon=false  \
                  -e canal.mq.topic=canal-test \
                  -e canal.serverMode=RocketMQ \
                  -e canal.mq.servers=192.168.28.130:9876 \

canal.mq.topic:配置mq的topic
canal.serverMode:tcp(默认), kafka, RocketMQ
canal.mq.servers:mq地址

投递MQ后的消息如下图:
在这里插入图片描述

2.6 如果要订阅的是mysql的从库该怎么做?

生产环境中的主库是不能随便重启的,所以订阅的话必须订阅mysql主从的从库,而从库中是默认下只将主库的操作写进中继日志,并写到自己的二进制日志的,所以需要让其成为canal的主库,必须让其将日志也写到自己的二进制日志里面。处理方法:修改my.cnf,增加一行log_slave_updates=1,重启数据库后就可以了。
在这里插入图片描述

3. 实时同步数据到ElasticSearch

如今大型的IT系统中,都会使用分布式的方式,作为使用最广泛的数据库,如何将mysql的数据与中间件的数据进行同步,既能确保数据的一致性、及时性,也能做到代码无侵入的方式呢?下面介绍如何实现数据修改后,需要及时的将mysql中的数据更新到elasticsearch中。

3.1 数据同步方案选择

  • 代码实现(双写):针对代码中进行数据库的增删改操作时,同时进行elasticsearch的增删改操作。

  • mybatis实现:通过mybatis plugin进行实现,截取sql语句进行分析, 针对insert、update、delete的语句进行处理。显然,这些操作如果都是单条数据的操作,是很容易处理的。但是,实际开发中,总是会有一些批量的更新或者删除操作,这时候,就很难进行处理了。

  • Aop实现:不管是通过哪种Aop方式,根据制定的规则,如规范方法名,注解等进行切面处理,但依然还是会出现无法处理批量操作数据的问题。

  • logstash:logstash类似的同步组件提供的文件和数据同步的功能,可以进行数据的同步,只需要简单的配置就能将mysql数据同步到elasticsearch,但是logstash的原理是每分钟进行一次增量数据查询,将结果同步到elasticsearch,实时性要求特别高的,可能无法满足要求。且此方案的性能不是很好,造成资源的浪费。

实现方式优缺点
代码实现技术难度低,侵入性强,实时性高
基于mybatis有一定的技术难度,但是无法覆盖所有的场景
Aop实现技术难度低,半侵入性,需要规范代码,依然无法覆盖所有的场景
logstash技术难度低,无侵入性,无需开发,但会造成资源浪费,实时性也不高

那么是否有什么更好的方式进行处理吗?

  • mysql binlog同步,实时性强,对于应用无任何侵入性,且性能更好,不会造成资源浪费。Canal安装部署通过数据库binlog实时抓取数据更新信息推送到消息队列MQ里,然后就可以通过消费MQ消息把数据实时同步到不同的异构数据源里了

3.2 增量同步ES

1.创建索引:在同步之前需要先创建号索引,下面是创建sys_user索引的例子

curl -X PUT "http://192.168.28.130:9200/sys_user/" -H 'Content-Type: application/json' -d'
{
    "settings" : {
    	"number_of_shards" : 1,
        "number_of_replicas" : 0
    },
    "mappings":{
	    "properties":{
                "id": {
                    "type": "long"
                },
                "username": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "nickname": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "mobile": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "sex": {
                    "type": "keyword"
                },
                "type": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "createTime": {
                    "type": "date"
                },
                "updateTime": {
                    "type": "date"
                },
                "company": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "openId": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    }
                },
                "isDel": {
                    "type": "keyword"
                }
	    }
    }
}'

地址192.168.28.130:9200为es的ip地址
sys_user为索引名

2.安装配置Adapter,下载adapter:https://github.com/alibaba/canal/releases
在这里插入图片描述
详细的配置说明请参考官方wiki:https://github.com/alibaba/canal/wiki/Sync-ES

3.同步表sys_user的配置样例:

canal.adapter-xxx\conf\application.yml:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: rocketMQ # kafka rocketMQ
  mqServers: 192.168.28.130:9876 #or rocketmq
  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.28.130:3306/user-center?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: canal-sys-user # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es
        hosts: 192.168.28.130:9300
        properties:
          cluster.name: my-es

mode:消费的类型有3种选择tcp、kafka和rocketMQ
mqServers: mq的地址
defaultDS:配置源数据库的地址
instance:配置mq的topic名称
es:配置es的地址和集群名

canal.adapter-xxx\conf\es\sys_user.yml:

dataSourceKey: defaultDS
destination: canal-sys-user
groupId: g1
esMapping:
  _index: sys_user
  _type: search_data
  _id: id
  upsert: true
  sql: "select id, username, nickname, mobile
          , case when sex = 0 then '男' when sex = 1 then '女' end sex
          , case when type = 'app' then '移动用户' when type = 'BACKEND' then '后台用户' end type
          , create_time createTime, update_time updateTime, company, open_id openId
          , case when is_del = 0 then '否' when is_del = 1 then '是' end isDel
        from sys_user"
  etlCondition: "where update_time >= '{0}'"
  commitBatch: 3000

dataSourceKey:配置application.yml中源数据库的key
destination:配置mq的topic名称
_index:插入es中的索引名
_type:插入es中mappings的type属性
_id:配置id字段
upsert:配置插入数据正常时写入,主键冲突时更新
sql:配置具体要同步es的数据
etlCondition:条件判断,通过更新日期实现增量同步

3.3 历史数据全量同步ES

如果在搭建增量同步之前mysql数据库已经存在历史数据,就需要做初始化同步,全量同步可以使用Canal-Adapterrest-api来实现

全量同步初始化,例子如下:

curl -X POST http://192.168.28.130:8081/etl/es/sys_user.yml

ip为Canal-Adapter所在服务器ip
路径/es/sys_user.yml为conf目录下配置文件的路径,会自动忽略where条件进行全量同步