社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

详解canal同步MySQL增量数据到ES

DBAplus社群 • 1 年前 • 253 次点击  

canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。


这篇文章,我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到ES



一、集群模式



图中 server 对应一个 canal 运行实例 ,对应一个 JVM 。


server 中包含 1..n 个 instance , 我们可以将 instance 理解为配置任务


instance 包含如下模块 :


  • eventParser:数据源接入,模拟 slave 协议和 master 进行交互,协议解析。

  • eventSink:Parser 和 Store 链接器,进行数据过滤,加工,分发的工作。

  • eventStore:数据存储。

  • metaManager:增量订阅 & 消费信息管理器。




真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式MQ 模式


实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。


顺序消费:


对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。



二、MySQL配置


1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf  中配置如下:


[mysqld]log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复


注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。


2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant :


CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;FLUSH PRIVILEGES;


3、创建数据库商品表 t_product :





    
CREATE TABLE `t_product` ( `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT, `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL, `price` DECIMAL ( 10, 2 ) NOT NULL, `status` TINYINT ( 4 ) NOT NULL, `create_time` datetime NOT NULL, `update_time` datetime NOT NULL,   PRIMARY KEY ( `id` ) ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

三、Elasticsearch配置


使用 Kibana 创建商品索引 。


PUT /t_product{    "settings": {        "number_of_shards": 2,        "number_of_replicas": 1    },    "mappings": {            "properties": {               "id": {                    "type":"keyword"                },                "name": {                    "type":"text"                },                "price": {                    "type":"double"                },                "status": {                    "type":"integer"                },                "createTime": {                    "type": "date",                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"                },                "updateTime": {                    "type": "date",                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"                }        }    }}


执行完成,如图所示 :



四、RocketMQ 配置


创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。




五、canal 配置


我们选取 canal 版本 1.1.6 ,进入 conf 目录。


1、配置 canal.properties


#集群模式 zk地址canal.zkServers = localhost:2181#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQcanal.serverMode = rocketMQ#instance 列表canal.destinations = product-syn#conf root dircanal.conf.dir = ../conf#全局的spring配置方式的组件文件 生产环境,集群化部署canal.instance.global.spring.xml = classpath:spring/default-instance.xml
###### 以下部分是默认值 展示出来 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)canal.mq.canalBatchSize = 50# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时canal.mq.canalGetTimeout = 100# 是否为 flat json格式对象canal.mq.flatMessage = true

2、instance 配置文件


在 conf 目录下创建实例目录 product-syn  , 在 product-syn 目录创建配置文件 :instance.properties。


#  按需修改成自己的数据库信息#################################################...canal.instance.master.address=192.168.1.20:3306# username/password,数据库的用户名和密码...canal.instance.dbUsername = canalcanal.instance.dbPassword = canal...
# table regex canal.instance.filter.regex=mytest.t_product
# mq configcanal.mq.topic=product-syn-topic# 针对库名或者表名发送动态topic#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.partitionsNum=3#库名.表名: 唯一主键,多个表之间用逗号分隔#canal.mq.partitionHash=mytest.person:id,mytest.role:id#################################################


3、服务启动


启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。



修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。



六、消费者


1、产品索引操作服务



2、消费监听器



消费者逻辑重点有两点:


  • 顺序消费监听器

  • 将消息数据转换成  JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATE、 INSERT、DELETE 执行产品索引操作服务的方法。


写到最后


canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。


推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。


这篇文章涉及到的代码已收录到下面的工程中,有兴趣的同学可以一看。


https://github.com/makemyownlife/rocketmq4-learning



作者丨勇哥

来源丨公众号:勇哥java实战分享(ID:gh_639fe71cc2db)

dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn


Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/162187
 
253 次点击