canal 消费进度
最佳答案 问答题库488位专家为你答疑解惑
如果使用SimpleCanalConnector/ClusterCanalConnector消费canal消息,可以使用多服务并发消费吗?
SimpleCanalConnector/ClusterCanalConnector的消费进度是canal-client级别的,同一个instance可以被不同的canal-client消费,并且具有独立的消费进度,而canal-client是使用clinetId标识的,在canal1.1.6中是写死的1001:
public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,int soTimeout, int idleTimeout){this.address = address;this.username = username;this.password = password;this.soTimeout = soTimeout;this.idleTimeout = idleTimeout;this.clientIdentity = new ClientIdentity(destination, (short) 1001);}
ClusterCanalConnector将消费位置信息存放在zookeeper中了(1001即是客户端id):
get /otter/canal/destinations/{destination}/1001/cursor
将{destination}换成自己的目标,数据内容如下:
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition",
"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.1.22","port":13000}},
"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000091","position":42731278,"serverId":1,"timestamp":1696929020000}}
journalName是bin log名称, 可以在mysql 使用show binary logs命令查看当前的bin log名称;
position是具体bin log中的位移,可以使用mysqlbinlog或者show binlog events命令查询位置数据。
在消费事件完成后,需要执行ack动作,消费位置会往前移动,下次取数据时从新的位置开始取。
canal server处理客户端ack的代码:
case CLIENTACK:ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());MDC.put("destination", ack.getDestination());if (StringUtils.isNotEmpty(ack.getDestination()) && StringUtils.isNotEmpty(ack.getClientId())) {if (ack.getBatchId() == 0L) {报错("batchId should assign value", ack.toString()).getMessage());} else if (ack.getBatchId() == -1L) { // -1代表上一次get没有数据,直接忽略之// donothing} else {clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));embeddedServer.ack(clientIdentity, ack.getBatchId());new ChannelFutureAggregator(ack.getDestination(),ack,packet.getType(),0,System.nanoTime() - start).operationComplete(null);}} else {报错("destination or clientId is null", ack.toString()).getMessage());}break;
// 更新cursorif (positionRanges.getAck() != null) {canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());if (logger.isInfoEnabled()) {logger.info("ack successfully, clientId:{} batchId:{} position:{}",clientIdentity.getClientId(),batchId,positionRanges);}}
在ZooKeeperMetaManager中
public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());byte[] data = JsonUtils.marshalToByte(position, JSONWriter.Feature.WriteClassName);try {zkClientx.writeData(path, data);} catch (ZkNoNodeException e) {zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建}}
试想一下在这种情况下,如果有多个服务同时消费(都使用默认的1001客户端Id),就会可能会造成消费混乱,多个服务会读取相同的数据,跟据数据处理的快慢,ACK后位置更新会有问题,比如A服务ACK(80)后,B服务又ACK(50), 消费进度就会回退了。
看到源码中有检查跳跃更新的代码,但是目前是注释状态,所以这块目前是没有处理的,所以不要试图多服务使用同一个clientId去消费同一个canal instance。
99%的人还看了
相似问题
- vsto word 获取目录起始页和结束页,如目录起始位置为2、结束位置为3,返回2和3
- IP地理位置定位技术:保护网络安全的新利器
- WSL2安装ubuntu及修改安装位置,设置Ubuntu开机启动链接ssh服务
- ROS navigation栅格地图原点位置如何确定?
- 35. 搜索插入位置 --力扣 --JAVA
- 【实用技巧】更改ArduinoIDE默认库文件位置,解放系统盘,将Arduino15中的库文件移动到其他磁盘
- 76基于matlab的免疫算法求解配送中心选址问题,根据配送地址确定最佳配送中心地址位置。
- 小程序判断是否授权位置信息和手动授权
- 计算机毕业设计 基于SpringBoot的车辆网位置信息管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解
- 基于DOTween插件实现金币飞行到指定位置功能
猜你感兴趣
版权申明
本文"canal 消费进度":http://eshow365.cn/6-19022-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!