微服务的架构虽然看起来很美,但是里面有一些是需要hold住的内容,其中一项是数据同步查询。当我们把系统拆分成多个业务域后,涉及到跨域的查询就捉襟见肘了,这个时候需要另外的指导思路(CQRS)架构解决这个问题。
上图查询的数据通过数据同步+搜索引擎进行解决,通过canal订阅mysql的binlog的变化,同步到elasticsearch。这个方案的优点是不会跟业务进行耦合,不需要在业务数据有变化的时候发送事件进行触发。缺点提高了系统的复杂度,需要引入canal的同步服务。
下面选择一个稍微复杂的场景讲解一下,订单数据一对多的场景。由于elasticsearch不像关系数据库一样可以跨索引查询,只能通过嵌套Object的字段类型解决这个问题。
安装的版本
- mysql 5.7
- canal-server v1.1.4
- canal-clent 1.1.4
- eleasticsearch 6.7.0
- kibana 6.7.0
- spring boot 2.2.0 RELEASE
- spring data eleasticseach 3.2.0 RELEASE
安装步骤
mysql
docker运行命令:MYSQL_ROOT_PASSWORD 设置mysql的root密码
docker run --restart=always -p 3306:3306 -v /Users/liyiye/docker/mysql/data:/var/lib/mysql -v /Users/liyiye/docker/mysql/config/my.cnf:/etc/mysql/my.cnf:ro --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7
my.cnf文件添加下面的内容,开启binlog,并设置为row模式
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
canal-server
启动地址(https://github.com/alibaba/canal/wiki/Docker-QuickStart)下载run.sh脚本后,在最后启动docker的地方把docker的版本号加上:
cmd="docker run -d -it -h $LOCALHOST $CONFIG --name=canal-server $VOLUMNS $NET_MODE $PORTS $MEMORY canal/canal-server:v1.1.4"
通过启动脚本在命令行启动:其中的 canal.instance.master.address 变量的ip要设置为你本机的ip。
sh run.sh -e canal.auto.scan=false \ -e canal.destinations=test \ -e canal.instance.master.address=10.11.41.191:3306 \ -e canal.instance.dbUsername=root \ -e canal.instance.dbPassword=123456 \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \
canal-client
地址(https://github.com/alibaba/canal/wiki/Sync-ES)下载后修改一下application.yml文件:
canal.conf: mode: tcp # kafka rocketMQ canalServerHost: 127.0.0.1:11111 # zookeeperHosts: slave1:2181 # mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://localhost:3306/test?useUnicode=true username: root password: 123456 canalAdapters: - instance: test # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: es hosts: 127.0.0.1:9200 properties: mode: rest cluster.name: docker-cluster
在es/conf目录添加一个yml文件,添加内容如下
dataSourceKey: defaultDS destination: test groupId: g1 esMapping: _index: order _type: _doc _id: _id upsert: true #pk: orderId sql: "select concat(d.id,'') as orderId,d.id as _id,d.shop_code as shopCode, d.extend_props as extendProds, c.confirmPackage as confirmPackage from order d left join (select p.order_id as _id,concat(p.order_id,'') as order_id, JSON_ARRAYAGG(JSON_OBJECT('waybillNo', p.waybill_no, 'packageId', p.id)) as package from order_package p group by p.order_id) c on c.order_id=d.id" objFields: extendProds: object #confirmPackage: array:; confirmPackage: object #etlCondition: "where a.c_time>={}" commitBatch: 3000
eleasticsearch 和 kibana
通过docker-compose进行安装
version: '2.2' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0 container_name: elasticsearch environment: - cluster.name=docker-cluster - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 volumes: - esdata1:/usr/share/elasticsearch/data ports: - 9200:9200 networks: - esnet elasticsearch2: image: docker.elastic.co/elasticsearch/elasticsearch:6.7.0 container_name: elasticsearch2 environment: - cluster.name=docker-cluster - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - "discovery.zen.ping.unicast.hosts=elasticsearch" ulimits: memlock: soft: -1 hard: -1 volumes: - esdata2:/usr/share/elasticsearch/data networks: - esnet kibana: image: kibana:6.7.0 container_name: kibana environment: - SERVER_NAME=kibana - ELASTICSEARCH_URL=http://elasticsearch:9200 ports: - 5601:5601 networks: - esnet volumes: esdata1: driver: local driver_opts: type: none device: /Users/liyiye/docker/es/esdata01 o: bind esdata2: driver: local driver_opts: type: none device: /Users/liyiye/docker/es/esdata02 o: bind networks: esnet: driver: bridge
安装后通过http://localhost:5601/ 进行访问kibana。同步数据前需要新建一下索引
PUT /order { "mappings": { "_doc": { "properties": { "confirmPackage": { "type": "nested", "properties": { "packageId": { "type": "long" }, "waybillNo": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "extendProds": { "properties": { "shopName": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "test": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "orderId": { "type": "text" }, "shopCode": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } }
elasticsearch的客户端查询
通过生成代码选择添加spring-data-elasticsearch根据前面提到的版本进行修改。通过在application.properties文件添加
spring.elasticsearch.rest.uris=localhost:9200
在单元测试内添加下面的属性,通过该属性就可以查询
@Autowired ElasticsearchRestTemplate elasticsearchTemplate;
验证步骤
在数据库创建两个表,字段根据前面配置的sql就可以的,对订单表,包裹表进行数据的新增更新删除,可以在kibana查看数据是否有相应更新。es本身是非实时的,就是数据插入后不能里面就能查询出来,有1s的延迟,这个方案不能用于实现性要求比较高的场景。