社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

Canal 实现 Mysql数据库实时数据同步

马哥Linux运维 • 4 年前 • 768 次点击  

简介

1.1 canal介绍

Canal是一个基于MySQL二进制日志的高性能数据同步系统。Canal广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数据管道,github地址:https://github.com/alibaba/canal

Canal Server能够解析MySQL binlog并订阅数据更改,而Canal Client可以实现将更改广播到任何地方,例如数据库和Apache Kafka。

它具有以下功能:

  • 支持所有平台。
  • 支持由Prometheus提供支持的细粒度系统监控。
  • 支持通过不同方式解析和订阅MySQL binlog,例如通过GTID。
  • 支持高性能,实时数据同步。(详见Performance)
  • Canal Server和Canal Client都支持HA / Scalability,由Apache ZooKeeper提供支持
  • Docker支持。

缺点:

不支持全量更新,只支持增量更新。

完整wiki地址:https://github.com/alibaba/canal/wiki

1.2 运作原理

原理很简单:

  1. Canal模拟MySQL的slave的交互协议,伪装成mysql slave,并将转发协议发送到MySQL Master服务器。
  2. MySQL Master接收到转储请求并开始将二进制日志推送到slave(即canal)。
  3. Canal将二进制日志对象解析为自己的数据类型(原始字节流)

如图所示:

准备工作

2.1 下载解压canal-server

通过 github 下载 canal-server release 版本(本次安装文档使用v1.1.4)

root@locahost:/# wget  https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压

tar -zxvf canal.deployer-1.1.4.tar.gz

2.2 下载解压 canal-adapter

通过 github 下载 canal-adapter release 版本(本次安装文档使用v1.1.4)

root@locahost:/# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

解压

tar -zxvf canal.adapter-1.1.4.tar.gz

配置 canal-server

3.1 canal-server 配置

解压之后进入 conf文件夹中,修改 canal.properties 根据实际需要来修改(如果不使用kafka或MQ 默认tcp即可)




    
canal.destinations = prod # 指定instance的名字多个使用逗号分隔

保存之后在conf目录创建 prod 文件夹并将 example文件夹中的 nstance.properties copy 到and_prod中

mkdir ant_prod  #创建文件夹
cp example/nstance.properties  prod/ # copy 文件

修改 nstance.properties 配置如下:

canal.instance.master.address=127.0.0.1:3306      # 源Mysql地址
canal.instance.dbUsername=canal                   # 源Mysql账号
canal.instance.dbPassword=canal                   # 源Mysql密码
canal.instance.connectionCharset=UTF-8            # 与源数据库编码格式一致 
canal.instance.defaultDatabaseName=test_database  # 默认监听源数据库

3.2 canal-server 启动

进入 canal-server bin 目录 启动

cd canal-server/bin # 进入目录
./startup.sh & # 后台启动

查看日志,是否启动成功

cd canal-server/logs/ant_prod #进入日志目录

启动成功:

2020-06-09 17:13:04.956 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]
2020-06-09 17:13:04.990 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-06-09 17:13:04.990 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [ant_prod/instance.properties]
2020-06-09 17:13:05.305 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-ant_prod 
2020-06-09 17:13:05.311 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-06-09 17:13:05.311 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 
2020-06-09 17:13:05.315 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-06-09 17:13:05.422 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-06-09 17:13:05.423 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2020-06-09 17:13:06.483 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000234,position=6676924,serverId=184376678,gtid=,timestamp=1591693973000] cost : 1051ms , the next step is binlog dump

配置 canal-adapter

4.1 canal-adapter 配置

由于Mysql 是8.0 这里需要下载 mysql-connector-java-8.0.20.jar,并将其放入lib中

cp mysql-connector-java-8.0.20.jar /canal-adapter/lib/

解压之后进入 conf文件夹中,修改 application.yml

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp # kafka rocketMQ
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
# 源Mysql 地址账号密码等
  srcDataSources: 
    defaultDS:
      url: jdbc:mysql://localhost:3306/test_database?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
      username: canal
      password: canal
# 需要实时同步数据库,如果多个实例进行区分即可
  canalAdapters:
  - instance: prod # canal instance,在canal-server中指定instance的名称
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb
        key: mysql1 # 唯一标示
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://localhost:3306/test_database_01?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
          jdbc.username: canal
          jdbc.password: canal

编辑rdb目录下面表的映射文件,数据库/表 (多个表创建多个映射文件,文件名对应表名)以此类推

dataSourceKey: defaultDS
destination: prod
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  database: test_database_01
  table: test
  targetTable: test_database_01.test
  targetPk:
    id: id
  mapAll: true

4.1 canal-adapter 启动

进入 canal-adapter/bin 目录 启动

cd canal-adapter/bin # 进入目录
./startup.sh & # 后台启动

查看日志,是否启动成功

cd canal-adapter/adapter/logs/ #进入日志目录
tail -f adapter.log # 查看日志是否启动成功

测试数据库同步

 更新/删除/批量插入/批量更新/批量删除

原文链接:https://www.jianshu.com/p/d4c177f0d831
作者:qingwenLi


设备多,负荷大,企业运维如何实现高效又智能?9月22日(周二)晚20:30,十年运维老鸟在线揭秘上市公司基础架构建设!带你玩转运维自动化!



马哥教育Linux、Python、Go系列课程火热报名中





往期推荐

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/73561
 
768 次点击