《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》

摘要: 原创出处 http://www.tianshouzhi.com/api/tutorials/canal/382 「田守枝」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

server模块的核心接口是CanalServer,其有2个实现类CanalServerWithNettyCanalServerWithEmbeded。关于CanalServer,官方文档中有有以下描述:

4068E89A-F5FA-47DD-AA48-A39761487DE6.png

下图是笔者对官方文档的进一步描述:

AD919783-7B87-4A19-93CF-FF542ED9E242.png

左边的图

表示的是Canal独立部署。不同的应用通过canal client与canal server进行通信,所有的canal client的请求统一由CanalServerWithNetty接受,之后CanalServerWithNetty会将客户端请求派给CanalServerWithEmbeded 进行真正的处理。CannalServerWithEmbeded内部维护了多个canal instance,每个canal instance伪装成不同的mysql实例的slave,而CanalServerWithEmbeded会根据客户端请求携带的destination参数确定要由哪一个canal instance为其提供服务。

右边的图

是直接在应用中嵌入CanalServerWithEmbeded,不需要独立部署canal。很明显,网络通信环节少了,同步binlog信息的效率肯定更高。但是对于使用者的技术要求比较高。在应用中,我们可以通过CanalServerWithEmbeded.instance()方法来获得CanalServerWithEmbeded实例,这一个单例。

整个server模块源码目录结构如下所示:

BE7FA9CC-0301-4B52-888A-BCD523BA60A1.png

其中上面的红色框就是嵌入式实现,而下面的绿色框是基于Netty的实现。

看起来基于netty的实现代码虽然多一点,这其实只是幻觉,CanalServerWithNetty会将所有的请求委派给CanalServerWithEmbedded处理。

而内嵌的方式只有CanalServerWithEmbedded一个类, 是因为CanalServerWithEmbedded又要根据destination选择某个具体的CanalInstance来处理客户端请求,而CanalInstance的实现位于instance模块,我们将在之后分析。因此从canal server的角度来说,CanalServerWithEmbedded才是server模块真正的核心。

​ CanalServerWithNetty和CanalServerWithEmbedded都是单例的,提供了一个静态方法instance()获取对应的实例。回顾前一节分析CanalController源码时,在CanalController构造方法中准备CanalServer的相关代码,就是通过这两个静态方法获取对应的实例的。

public CanalController(final Properties properties){        ....     // 准备canal server        ip = getProperty(properties, CanalConstants.CANAL_IP);        port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));        embededCanalServer = CanalServerWithEmbedded.instance();        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator        canalServer = CanalServerWithNetty.instance();        canalServer.setIp(ip);        canalServer.setPort(port);       ....   }

CanalServer接口

CanalServer接口继承了CanalLifeCycle接口,主要是为了重新定义startstop方法,抛出CanalServerException

public interface CanalServer extends CanalLifeCycle {     void start() throws CanalServerException;     void stop() throws CanalServerException;}

CanalServerWithNetty

CanalServerWithNetty主要用于接受客户端的请求,然后将其委派给CanalServerWithEmbeded处理。下面的源码显示了CanalServerWithNetty种定义的字段和构造方法

public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer {    //监听的所有客户端请求都会为派给CanalServerWithEmbedded处理     private CanalServerWithEmbedded embeddedServer;      // 嵌入式server     //监听的ip和port,client通过此ip和port与服务端通信    private String                  ip;    private int                     port;     //netty组件    private Channel                 serverChannel = null;    private ServerBootstrap         bootstrap     = null;    //....单例模式实现    private CanalServerWithNetty(){        //给embeddedServer赋值        this.embeddedServer = CanalServerWithEmbedded.instance();    }    //... start and stop method    //...setters and getters...}

字段说明:

  • **embeddedServer:**因为CanalServerWithNetty需要将请求委派给CanalServerWithEmbeded处理,因此其维护了embeddedServer对象。
  • **ip、port:**这是netty监听的网络ip和端口,client通过这个ip和端口与server通信
  • **serverChannel、bootstrap:**这是netty的API。其中ServerBootstrap用于启动服务端,通过调用其bind方法,返回一个类型为Channel的serverChannel对象,代表服务端通道。关于netty知识不是本教程重点,如果读者不熟悉,可以参考笔者的netty教程

start方法

start方法中包含了netty启动的核心逻辑,如下所示:

com.alibaba.otter.canal.server.netty.CanalServerWithNetty#start

public void start() {        super.start();        //优先启动内嵌的canal server,因为基于netty的实现需要将请求委派给其处理        if (!embeddedServer.isStart()) {            embeddedServer.start();        }                 /* 创建bootstrap实例,参数NioServerSocketChannelFactory也是Netty的API,其接受2个线程池参数         其中第一个线程池是Accept线程池,第二个线程池是woker线程池,         Accept线程池接收到client连接请求后,会将代表client的对象转发给worker线程池处理。         这里属于netty的知识,不熟悉的用户暂时不必深究,简单认为netty使用线程来处理客户端的高并发请求即可。*/        this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),            Executors.newCachedThreadPool()));                    /*pipeline实际上就是netty对客户端请求的处理器链,        可以类比JAVA EE编程中Filter的责任链模式,上一个filter处理完成之后交给下一个filter处理,        只不过在netty中,不再是filter,而是ChannelHandler。*/        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline pipelines = Channels.pipeline();               //主要是处理编码、解码。因为网路传输的传入的都是二进制流,FixedHeaderFrameDecoder的作用就是对其进行解析                pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());               //处理client与server握手                pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler());               //client身份验证               pipelines.addLast(ClientAuthenticationHandler.class.getName(),                    new ClientAuthenticationHandler(embeddedServer));                //SessionHandler用于真正的处理客户端请求,是本文分析的重点               SessionHandler sessionHandler = new SessionHandler(embeddedServer);                pipelines.addLast(SessionHandler.class.getName(), sessionHandler);                return pipelines;            }        });                // 启动,当bind方法被调用时,netty开始真正的监控某个端口,此时客户端对这个端口的请求可以被接受到        if (StringUtils.isNotEmpty(ip)) {            this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port));        } else {            this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port));        }    }

关于stop方法无非是一些关闭操作,代码很简单,这里不做介绍。

SessionHandler

​ 很明显的,canal处理client请求的核心逻辑都在SessionHandler这个处理器中。注意其在实例化时,传入了embeddedServer对象,前面我们提过,CanalServerWithNetty要将请求委派给CanalServerWithEmbedded处理,显然SessionHandler也要维护embeddedServer实例。

​ 这里我们主要分析SessionHandler的 messageReceived方法,这个方法表示接受到了一个客户端请求,我们主要看的是SessionHandler如何对客户端请求进行解析,然后委派给CanalServerWithEmbedded处理的。为了体现其转发请求处理的核心逻辑,以下代码省去了大量源码片段,如下

SessionHandler#messageReceived

public class SessionHandler extends SimpleChannelHandler {....//messageReceived方法表示收到客户端请求public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {    ....      //根据客户端发送的网路通信包请求类型type,将请求委派embeddedServer处理        switch (packet.getType()) {            case SUBSCRIPTION://订阅请求                ...                embeddedServer.subscribe(clientIdentity);                         ...                break;            case UNSUBSCRIPTION://取消订阅请求                ...                embeddedServer.unsubscribe(clientIdentity);                ...                break;            case GET://获取binlog请求                ....                    if (get.getTimeout() == -1) {// 根据客户端是否指定了请求超时时间调用embeddedServer不同方法获取binlog                        message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());                    } else {                        ...                        message = embeddedServer.getWithoutAck(clientIdentity,                            get.getFetchSize(),                            get.getTimeout(),                            unit);                    }                ...                   break;            case CLIENTACK://客户端消费成功ack请求               ...                  embeddedServer.ack(clientIdentity, ack.getBatchId());               ...                break;            case CLIENTROLLBACK://客户端消费失败回滚请求                ...                    if (rollback.getBatchId() == 0L) {                        embeddedServer.rollback(clientIdentity);// 回滚所有批次                    } else {                        embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次                    }                ...                break;            default://无法判断请求类型                NettyUtils.error(400, MessageFormatter.format("packet type={} is NOT supported!", packet.getType())                    .getMessage(), ctx.getChannel(), null);                break;        }    ...}...}

​ 可以看到,SessionHandler对client请求进行解析后,根据请求类型,委派给CanalServerWithEmbedded的相应方法进行处理。因此核心逻辑都在CanalServerWithEmbedded中。

CannalServerWithEmbeded

CanalServerWithEmbedded实现了CanalServer和CanalServiceCan两个接口。其内部维护了一个Map,key为destination,value为对应的CanalInstance,根据客户端请求携带的destination参数将其转发到对应的CanalInstance上去处理。

public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {    ...    //key为destination,value为对应的CanalInstance。    private Map<String, CanalInstance> canalInstances;    ...}

对于CanalServer接口中定义的start和stop这两个方法实现比较简单,这里不再赘述。

​ 在上面的SessionHandler源码分析中,我们已经看到,会根据请求报文的类型,会调用CanalServerWithEmbedded的相应方法,这些方法都定义在CanalService接口中,如下:

public interface CanalService {   //订阅    void subscribe(ClientIdentity clientIdentity) throws CanalServerException;   //取消订阅    void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException;   //比例获取数据,并自动自行ack    Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;   //超时时间内批量获取数据,并自动进行ack    Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException;    //批量获取数据,不进行ack    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException;   //超时时间内批量获取数据,不进行ack    Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)                                                                                               throws CanalServerException;   //ack某个批次的数据    void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException;   //回滚所有没有ack的批次的数据    void rollback(ClientIdentity clientIdentity) throws CanalServerException;   //回滚某个批次的数据    void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException;}

细心地的读者会发现,每个方法中都包含了一个ClientIdentity类型参数,这就是客户端身份的标识。

public class ClientIdentity implements Serializable {    private String destination;    private short  clientId;    private String filter; ...}

CanalServerWithEmbedded就是根据ClientIdentity中的destination参数确定这个请求要交给哪个CanalInstance处理的。

下面一次分析每一个方法的作用:

subscribe方法:

subscribe主要用于处理客户端的订阅请求,目前情况下,一个CanalInstance只能由一个客户端订阅,不过可以重复订阅。订阅主要的处理步骤如下:

1、根据客户端要订阅的destination,找到对应的CanalInstance

2、通过这个CanalInstance的CanalMetaManager组件记录下有客户端订阅。

3、获取客户端当前订阅位置(Position)。首先尝试从CanalMetaManager中获取,CanalMetaManager 中记录了某个client当前订阅binlog的位置信息。如果是第一次订阅,肯定无法获取到这个位置,则尝试从CanalEventStore中获取第一个binlog的位置。从CanalEventStore中获取binlog位置信息的逻辑是:CanalInstance一旦启动,就会立刻去拉取binlog,存储到CanalEventStore中,在第一次订阅的情况下,CanalEventStore中的第一条binlog的位置,就是当前客户端当前消费的开始位置。

4、通知CanalInstance订阅关系变化

/** * 客户端订阅,重复订阅时会更新对应的filter信息 */@Overridepublic void subscribe(ClientIdentity clientIdentity) throws CanalServerException {    checkStart(clientIdentity.getDestination());    //1、根据客户端要订阅的destination,找到对应的CanalInstance     CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());    if (!canalInstance.getMetaManager().isStart()) {        canalInstance.getMetaManager().start();    }  //2、通过CanalInstance的CanalMetaManager组件进行元数据管理,记录一下当前这个CanalInstance有客户端在订阅    canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅  //3、获取客户端当前订阅的binlog位置(Position),首先尝试从CanalMetaManager中获取    Position position = canalInstance.getMetaManager().getCursor(clientIdentity);    if (position == null) {  //3.1 如果是第一次订阅,尝试从CanalEventStore中获取第一个binlog的位置,作为客户端订阅开始的位置。        position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条        if (position != null) {            canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor        }        logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);    } else {        logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);    }    //4 通知下订阅关系变化    canalInstance.subscribeChange(clientIdentity);}

unsubscribe方法:

unsubscribe方法主要用于取消订阅关系。在下面的代码中,我们可以看到,其实就是找到CanalInstance对应的CanalMetaManager,调用其unsubscribe取消这个订阅记录。需要注意的是,取消订阅并不意味着停止CanalInstance。当某个客户端取消了订阅,还会有新的client来订阅这个CanalInstance,所以不能停。

/** * 取消订阅 */@Overridepublic void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());    canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅    logger.info("unsubscribe successfully, {}", clientIdentity);}

listAllSubscribe方法:

​ 这一个管理方法,其作用是列出订阅某个destination的所有client。这里返回的是一个List<ClientIdentity>,不过我们已经多次提到,目前一个destination只能由一个client订阅。这里之所以返回一个list,是canal原先计划要支持多个client订阅同一个destination。不过,这个功能一直没有实现。所以List中,实际上只会包含一个ClientIdentity。

/** * 查询所有的订阅信息 */public List<ClientIdentity> listAllSubscribe(String destination) throws CanalServerException {    CanalInstance canalInstance = canalInstances.get(destination);    return canalInstance.getMetaManager().listAllSubscribeInfo(destination);}

listBatchIds方法:

/** * 查询当前未被ack的batch列表,batchId会按照从小到大进行返回 */public List<Long> listBatchIds(ClientIdentity clientIdentity) throws CanalServerException {    checkStart(clientIdentity.getDestination());    checkSubscribe(clientIdentity);    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());    Map<Long, PositionRange> batchs = canalInstance.getMetaManager().listAllBatchs(clientIdentity);    List<Long> result = new ArrayList<Long>(batchs.keySet());    Collections.sort(result);    return result;}

getWithoutAck方法:

getWithoutAck方法用于客户端获取binlog消息 ,一个获取一批(batch)的binlog,canal会为这批binlog生成一个唯一的batchId。客户端如果消费成功,则调用ack方法对这个批次进行确认。如果失败的话,可以调用rollback方法进行回滚。客户端可以连续多次调用getWithoutAck方法来获取binlog,在ack的时候,需要按照获取到binlog的先后顺序进行ack。如果后面获取的binlog被ack了,那么之前没有ack的binlog消息也会自动被ack。

getWithoutAck方法大致工作步骤如下所示:

  1. 根据destination找到要从哪一个CanalInstance中获取binlog消息。
  2. 确定从哪一个位置(Position)开始继续消费binlog。通常情况下,这个信息是存储在CanalMetaManager中。特别的,在第一次获取的时候,CanalMetaManager 中还没有存储任何binlog位置信息。此时CanalEventStore中存储的第一条binlog位置,则应该client开始消费的位置。
  3. 根据Position从CanalEventStore中获取binlog。为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。 如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,如果没有获取到binlog也立即返回。
  4. 在CanalMetaManager中记录这个批次的binlog消息。CanalMetaManager会为获取到的这个批次的binlog生成一个唯一的batchId,batchId是递增的。如果binlog信息为空,则直接把batchId设置为-1。

@Overridepublic Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {    return getWithoutAck(clientIdentity, batchSize, null, null);}/** * <pre> * 几种case: * a. 如果timeout为null,则采用tryGet方式,即时获取 * b. 如果timeout不为null *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回 *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少 * </pre> */@Overridepublic Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)                                                                                                       throws CanalServerException {    checkStart(clientIdentity.getDestination());    checkSubscribe(clientIdentity);      // 1、根据destination找到要从哪一个CanalInstance中获取binlog消息    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());    synchronized (canalInstance) {        //2、从CanalMetaManager中获取最后一个没有ack的binlog批次的位置信息。        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);   //3 从CanalEventStore中获取binlog        Events<Event> events = null;        if (positionRanges != null) { // 3.1 如果从CanalMetaManager获取到了位置信息,从当前位置继续获取binlog            events = getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);        } else { //3.2 如果没有获取到binlog位置信息,从当前store中的第一条开始获取            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);            if (start == null) { // 第一次,还没有过ack记录,则获取当前store中的第一条                start = canalInstance.getEventStore().getFirstPosition();            }      // 从CanalEventStore中获取binlog消息            events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);        }        //4 记录批次信息到CanalMetaManager中        if (CollectionUtils.isEmpty(events.getEvents())) {          //4.1 如果获取到的binlog消息为空,构造一个空的Message对象,将batchId设置为-1返回给客户端            logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", new Object[] {                    clientIdentity.getClientId(), batchSize });            return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能        } else {           //4.2 如果获取到了binlog消息,将这个批次的binlog消息记录到CanalMetaMaager中,并生成一个唯一的batchId            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());            //将Events转为Entry            List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {                public Entry apply(Event input) {                    return input.getEntry();                }            });            logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]",                clientIdentity.getClientId(),                batchSize,                entrys.size(),                batchId,                events.getPositionRange());            //构造Message返回            return new Message(batchId, entrys);        }    }}/** * 根据不同的参数,选择不同的方式获取数据 */private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,                                TimeUnit unit) {    if (timeout == null) {        return eventStore.tryGet(start, batchSize);    } else {        try {            if (timeout <= 0) {                return eventStore.get(start, batchSize);            } else {                return eventStore.get(start, batchSize, timeout, unit);            }        } catch (Exception e) {            throw new CanalServerException(e);        }    }}

ack方法:

ack方法时客户端用户确认某个批次的binlog消费成功。进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)

ack时需要做以下几件事情:

  1. 从CanalMetaManager中,移除这个批次的信息。在getWithoutAck方法中,将批次的信息记录到了CanalMetaManager中,ack时移除。
  2. 记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始,这是通过CanalMetaManager记录的。
  3. 从CanalEventStore中,将这个批次的binlog内容移除。因为已经消费成功,继续保存这些已经消费过的binlog没有任何意义,只会白白占用内存。

@Overridepublic void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {    checkStart(clientIdentity.getDestination());    checkSubscribe(clientIdentity);    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());    PositionRange<LogPosition> positionRanges = null;   //1 从CanalMetaManager中,移除这个批次的信息    positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); // 更新位置    if (positionRanges == null) { // 说明是重复的ack/rollback        throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check",            clientIdentity.getClientId(),            batchId));    }    //2、记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始,这是通过CanalMetaManager记录的    if (positionRanges.getAck() != null) {        canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());        logger.info("ack successfully, clientId:{} batchId:{} position:{}",            clientIdentity.getClientId(),            batchId,            positionRanges);    }      //3、从CanalEventStore中,将这个批次的binlog内容移除    canalInstance.getEventStore().ack(positionRanges.getEnd());}

rollback方法:

/** * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿 */@Overridepublic void rollback(ClientIdentity clientIdentity) throws CanalServerException {    checkStart(clientIdentity.getDestination());    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());    // 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅    boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);    if (!hasSubscribe) {        return;    }    synchronized (canalInstance) {        // 清除batch信息        canalInstance.getMetaManager().clearAllBatchs(clientIdentity);        // rollback eventStore中的状态信息        canalInstance.getEventStore().rollback();        logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() });    }}/** * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿 */@Overridepublic void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {    checkStart(clientIdentity.getDestination());    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());    // 因为存在第一次链接时自动rollback的情况,所以需要忽略未订阅    boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);    if (!hasSubscribe) {        return;    }    synchronized (canalInstance) {        // 清除batch信息        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,            batchId);        if (positionRanges == null) { // 说明是重复的ack/rollback            throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",                clientIdentity.getClientId(),                batchId));        }        // lastRollbackPostions.put(clientIdentity,        // positionRanges.getEnd());// 记录一下最后rollback的位置        // TODO 后续rollback到指定的batchId位置        canalInstance.getEventStore().rollback();// rollback                                                 // eventStore中的状态信息        logger.info("rollback successfully, clientId:{} batchId:{} position:{}",            clientIdentity.getClientId(),            batchId,            positionRanges);    }}

get方法:

与getWithoutAck主要流程完全相同,唯一不同的是,在返回数据给用户前,直接进行了ack,而不管客户端消费是否成功。

@Overridepublic Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {    return get(clientIdentity, batchSize, null, null);} /* * 几种case: * a. 如果timeout为null,则采用tryGet方式,即时获取 * b. 如果timeout不为null *    1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回 *    2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少 */@Overridepublic Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)                                                                                             throws CanalServerException {    checkStart(clientIdentity.getDestination());    checkSubscribe(clientIdentity);    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());    synchronized (canalInstance) {        // 获取到流式数据中的最后一批获取的位置        PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);        if (positionRanges != null) {            throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data",                clientIdentity.getClientId(),                positionRanges));        }        Events<Event> events = null;        Position start = canalInstance.getMetaManager().getCursor(clientIdentity);        events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);        if (CollectionUtils.isEmpty(events.getEvents())) {            logger.debug("get successfully, clientId:{} batchSize:{} but result is null", new Object[] {                    clientIdentity.getClientId(), batchSize });            return new Message(-1, new ArrayList<Entry>()); // 返回空包,避免生成batchId,浪费性能        } else {            // 记录到流式信息            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());            List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() {                public Entry apply(Event input) {                    return input.getEntry();                }            });            logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",                clientIdentity.getClientId(),                batchSize,                entrys.size(),                batchId,                events.getPositionRange());            // 直接提交ack            ack(clientIdentity, batchId);            return new Message(batchId, entrys);        }    }}

文章目录
  1. 1. CanalServer接口
  2. 2. CanalServerWithNetty
    1. 2.0.1. start方法
  • 3. SessionHandler
  • 4. CannalServerWithEmbeded