1. 首页
  2. >
  3. 数据库技术
  4. >
  5. MySQL

基于canal实现mysql的数据同步

canal是什么?

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

基于canal实现mysql的数据同步

[]( https://github.com/alibaba/ca...

基于上面的讲解,我们在实现canal之前,先简单做一个主从复制。一主 一从

  • 首先下载mysql 镜像,并启动
docker pull mysql:latest docker run -itd --name mysql-1 -p 23306:3306 -e MYSQL_ROOT_PASSWORD=root  mysql docker run -itd --name mysql-2 -p 23307:3306 -e MYSQL_ROOT_PASSWORD=root  mysql
  • 相关命令再解释一下:name xxx :xxx为容器名p 111:222 其中111是宿主机端口,222是容器端口MYSQL_ROOT_PASSWORD=root 设置root账户密码为root

基于canal实现mysql的数据同步

  • 进入容器测试一下,一切正常

基于canal实现mysql的数据同步

  • 设置 mysql-1为主,mysql-2为从库
  • 修改一下 mysql的配置,安装vim编辑器
apt-get update apt-get install vim
  • 在主库 创建一个mysql账户给从库使用
CREATE USER 'slave'@'%' IDENTIFIED BY '123456'; GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'slave'@'%'; FLUSH PRIVILEGES

基于canal实现mysql的数据同步

  • 修改一下从服务器
[mysqld] pid-file        = /var/run/mysqld/mysqld.pid socket          = /var/run/mysqld/mysqld.sock datadir         = /var/lib/mysql secure-file-priv= NULL server_id=100 log-bin=mysql-slave-bin relay_log=edu-mysql-relay-bin
  • 退出重启从服务器docker
  • 进入从服务器 执行
mysql> change master to master_host='172.17.0.4', master_user='slave', master_password='123456', master_port=3306, master_log_file='edu-mysql-bin.000001', master_log_pos= 877, master_connect_retry=30;

相关命令解释

master_port:Master的端口号,指的是容器的端口号

master_user:用于数据同步的用户

master_password:用于同步的用户的密码

master_log_file:指定 Slave 从哪个日志文件开始复制数据,即上文中提到的 File 字段的值

master_log_pos:从哪个 Position 开始读,即上文中提到的 Position 字段的值

master_connect_retry:如果连接失败,重试的时间间隔,单位是秒,默认是60秒

在Slave 中的mysql终端执行show slave status \G;用于查看主从同步状态。

  • 出现一下信息说明配置成功

基于canal实现mysql的数据同步

  • 接下来再主库写数据,从库同步成功

基于canal实现mysql的数据同步

  • 简单的主从同步完成了,但是我们要想,怎么实现的主从同步,对吧;

其实就是 通过 同步二进制日志文件,从服务器 会起一个io进程,读取二进制文件同步到 从服务器

基于canal实现mysql的数据同步

  • 简单看一下二进制文件的内容;

基于canal实现mysql的数据同步

基于canal实现mysql的数据同步

为什么再 将canal 之前要先说主从复制呢,其实canal 就是把自己伪装成了从服务器,从而读取日志,拿到数据;

使用docker 部署canal

参考链接

docker pull canal/canal-server:latest # 下载脚本 wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh   # 构建一个destination name为test的队列, address 对应的数据库ip+端口 ,dbUsername对应数据库用户名,dbPassword对应数据库密码,注意修改为自己的 sh run.sh -e canal.auto.scan=false \ -e canal.destinations=test \ -e canal.instance.master.address=172.17.0.4:3306  \ -e canal.instance.dbUsername=canal  \ -e canal.instance.dbPassword=canal  \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false  \
  • 启动之后可以进入容器,看一下里面的 日志,如果出现了标红的信息,说明成功,否则就查看里面的报错信息吧!也不难

基于canal实现mysql的数据同步

基于canal实现mysql的数据同步

  • 配合php 查看数据变化(此处不限php,java,go,python等都有接口)
  • 多语言连接 https://github.com/alibaba/canal

php 监听数据变化

  • canal-php

canal-php 是阿里巴巴开源项目 Canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件 的 php 客户端。为 php 开发者提供一个更友好的使用 Canal 的方式。Canal 是mysql数据库binlog的增量订阅&消费组件。

基于日志增量订阅&消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki

  • 应用场景

canal-php 作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子:

1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等

3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis

4.数据库异地备份、数据同步

5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。

6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

  • 工作原理

canal-php 是 Canal 的 php 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。

  • 工作流程

基于canal实现mysql的数据同步

  • 使用组件安装,此处我有一个laravel框架,直接在laravel里面安装使用了,相关代码贴出
# 安装组件canal-php  composer require xingwenge/canal_php # 编写脚本监听 <?php  namespace App\Console\Commands; use xingwenge\canal_php\CanalClient; use xingwenge\canal_php\CanalConnectorFactory; use xingwenge\canal_php\Fmt; use Illuminate\Console\Command;  ini_set('display_errors', 'On'); error_reporting(E_ALL); class CanalDemo extends Command {     /**      * The name and signature of the console command.      *      * @var string      */     protected $signature = 'CanalDemo';      /**      * The console command description.      *      * @var string      */     protected $description = '测试canal';      /**      * Create a new command instance.      *      * @return void      */     public function __construct()     {         parent::__construct();     }      /**      * Execute the console command.      *      * @return mixed      */     public function handle()     {         try {             $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);             # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);              $client->connect("172.17.0.5", 11111);//此处修改为自己的配置             $client->checkValid();             $client->subscribe("1001", "test", ".*\\..*");//对应启动容器时test的队列名             while (true) {                 $message = $client->get(100);                 if ($entries = $message->getEntries()) {                     foreach ($entries as $entry) {                         Fmt::println($entry);                     }                 }                 sleep(1);             }              $client->disConnect();         } catch (\Exception $e) {             echo $e->getMessage(), PHP_EOL;         }      }  }
  • 更改数据

基于canal实现mysql的数据同步

  • 监听结果

基于canal实现mysql的数据同步