当前位置:首页 > 编程笔记 > 正文
已解决

canal 消费进度

来自网友在路上 148848提问 提问时间:2023-10-11 23:45:59阅读次数: 48

最佳答案 问答题库488位专家为你答疑解惑

如果使用SimpleCanalConnector/ClusterCanalConnector消费canal消息,可以使用多服务并发消费吗?

SimpleCanalConnector/ClusterCanalConnector的消费进度是canal-client级别的,同一个instance可以被不同的canal-client消费,并且具有独立的消费进度,而canal-client是使用clinetId标识的,在canal1.1.6中是写死的1001:

    public SimpleCanalConnector(SocketAddress address, String username, String password, String destination,int soTimeout, int idleTimeout){this.address = address;this.username = username;this.password = password;this.soTimeout = soTimeout;this.idleTimeout = idleTimeout;this.clientIdentity = new ClientIdentity(destination, (short) 1001);}

ClusterCanalConnector将消费位置信息存放在zookeeper中了(1001即是客户端id):

get /otter/canal/destinations/{destination}/1001/cursor

将{destination}换成自己的目标,数据内容如下:

{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition",
"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.1.22","port":13000}},
"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000091","position":42731278,"serverId":1,"timestamp":1696929020000}}

journalName是bin log名称, 可以在mysql 使用show binary logs命令查看当前的bin log名称;
position是具体bin log中的位移,可以使用mysqlbinlog或者show binlog events命令查询位置数据。

在消费事件完成后,需要执行ack动作,消费位置会往前移动,下次取数据时从新的位置开始取。
canal server处理客户端ack的代码:

case CLIENTACK:ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());MDC.put("destination", ack.getDestination());if (StringUtils.isNotEmpty(ack.getDestination()) && StringUtils.isNotEmpty(ack.getClientId())) {if (ack.getBatchId() == 0L) {报错("batchId should assign value", ack.toString()).getMessage());} else if (ack.getBatchId() == -1L) { // -1代表上一次get没有数据,直接忽略之// donothing} else {clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));embeddedServer.ack(clientIdentity, ack.getBatchId());new ChannelFutureAggregator(ack.getDestination(),ack,packet.getType(),0,System.nanoTime() - start).operationComplete(null);}} else {报错("destination or clientId is null", ack.toString()).getMessage());}break;
   // 更新cursorif (positionRanges.getAck() != null) {canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());if (logger.isInfoEnabled()) {logger.info("ack successfully, clientId:{} batchId:{} position:{}",clientIdentity.getClientId(),batchId,positionRanges);}}

在ZooKeeperMetaManager中

public void updateCursor(ClientIdentity clientIdentity, Position position) throws CanalMetaManagerException {String path = ZookeeperPathUtils.getCursorPath(clientIdentity.getDestination(), clientIdentity.getClientId());byte[] data = JsonUtils.marshalToByte(position, JSONWriter.Feature.WriteClassName);try {zkClientx.writeData(path, data);} catch (ZkNoNodeException e) {zkClientx.createPersistent(path, data, true);// 第一次节点不存在,则尝试重建}}

试想一下在这种情况下,如果有多个服务同时消费(都使用默认的1001客户端Id),就会可能会造成消费混乱,多个服务会读取相同的数据,跟据数据处理的快慢,ACK后位置更新会有问题,比如A服务ACK(80)后,B服务又ACK(50), 消费进度就会回退了。
看到源码中有检查跳跃更新的代码,但是目前是注释状态,所以这块目前是没有处理的,所以不要试图多服务使用同一个clientId去消费同一个canal instance。

查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"canal 消费进度":http://eshow365.cn/6-19022-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!