社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

聊聊canal的MysqlDetectingTimeTask

go4it • 5 年前 • 236 次点击  
阅读 8

聊聊canal的MysqlDetectingTimeTask

本文主要研究一下canal的MysqlDetectingTimeTask

MysqlDetectingTimeTask

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

    class MysqlDetectingTimeTask extends TimerTask {

        private boolean         reconnect = false;
        private MysqlConnection mysqlConnection;

        public MysqlDetectingTimeTask(MysqlConnection mysqlConnection){
            this.mysqlConnection = mysqlConnection;
        }

        public void run() {
            try {
                if (reconnect) {
                    reconnect = false;
                    mysqlConnection.reconnect();
                } else if (!mysqlConnection.isConnected()) {
                    mysqlConnection.connect();
                }
                Long startTime = System.currentTimeMillis();

                // 可能心跳sql为select 1
                if (StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "select")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "show")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "explain")
                    || StringUtils.startsWithIgnoreCase(detectingSQL.trim(), "desc")) {
                    mysqlConnection.query(detectingSQL);
                } else {
                    mysqlConnection.update(detectingSQL);
                }

                Long costTime = System.currentTimeMillis() - startTime;
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onSuccess(costTime);
                }
            } catch (SocketTimeoutException e) {
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onFailed(e);
                }
                reconnect = true;
                logger.warn("connect failed by ", e);
            } catch (IOException e) {
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onFailed(e);
                }
                reconnect = true;
                logger.warn("connect failed by ", e);
            } catch (Throwable e) {
                if (haController != null && haController instanceof HeartBeatCallback) {
                    ((HeartBeatCallback) haController).onFailed(e);
                }
                reconnect = true;
                logger.warn("connect failed by ", e);
            }

        }

        public MysqlConnection getMysqlConnection() {
            return mysqlConnection;
        }
    }
复制代码
  • MysqlDetectingTimeTask继承了TimerTask,其run方法在reconnect为true时,更新reconnect为false,然后执行mysqlConnection.reconnect();若reconnect为false且mysqlConnection.isConnected()为false则执行mysqlConnection.connect();之后执行detectingSQL语句,然后记录costTime,最后执行((HeartBeatCallback) haController).onSuccess(costTime);依次捕获SocketTimeoutException、IOException、Throwable,然后则执行((HeartBeatCallback) haController).onFailed(e),更新reconnect为true

HeartBeatCallback

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/HeartBeatCallback.java

public interface HeartBeatCallback {

    /**
     * 心跳发送成功
     */
    public void onSuccess(long costTime);

    /**
     * 心跳发送失败
     */
    public void onFailed(Throwable e);

}
复制代码
  • HeartBeatCallback接口定义了onSuccess、onFailed方法

HeartBeatHAController

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/ha/HeartBeatHAController.java

public class HeartBeatHAController extends AbstractCanalLifeCycle implements CanalHAController, HeartBeatCallback {

    private static final Logger logger              = LoggerFactory.getLogger(HeartBeatHAController.class);
    // default 3 times
    private int                 detectingRetryTimes = 3;
    private int                 failedTimes         = 0;
    private boolean             switchEnable        = false;
    private CanalHASwitchable   eventParser;

    public HeartBeatHAController(){

    }

    public void onSuccess(long costTime) {
        failedTimes = 0;
    }

    public void onFailed(Throwable e) {
        failedTimes++;
        // 检查一下是否超过失败次数
        synchronized (this) {
            if (failedTimes > detectingRetryTimes) {
                if (switchEnable) {
                    eventParser.doSwitch();// 通知执行一次切换
                    failedTimes = 0;
                } else {
                    logger.warn("HeartBeat failed Times:{} , should auto switch ?", failedTimes);
                }
            }
        }
    }

    // ============================= setter / getter
    // ============================

    public void setCanalHASwitchable(CanalHASwitchable canalHASwitchable) {
        this.eventParser = canalHASwitchable;
    }

    public void setDetectingRetryTimes(int detectingRetryTimes) {
        this.detectingRetryTimes = detectingRetryTimes;
    }

    public void setSwitchEnable(boolean switchEnable) {
        this.switchEnable = switchEnable;
    }

}
复制代码
  • HeartBeatHAController实现了HeartBeatCallback接口,其onSuccess方法更新failedTimes为0;其onFailed方法递增failedTimes,然后判断failedTimes是否大于detectingRetryTimes,且switchEnable为true则执行eventParser.doSwitch(),然后更新failedTimes为0

doSwitch

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

public class MysqlEventParser extends AbstractMysqlEventParser implements CanalEventParser, CanalHASwitchable {

	//......

    // 处理主备切换的逻辑
    public void doSwitch() {
        AuthenticationInfo newRunningInfo = (runningInfo.equals(masterInfo) ? standbyInfo : masterInfo);
        this.doSwitch(newRunningInfo);
    }

    public void doSwitch(AuthenticationInfo newRunningInfo) {
        // 1. 需要停止当前正在复制的过程
        // 2. 找到新的position点
        // 3. 重新建立链接,开始复制数据
        // 切换ip
        String alarmMessage = null;

        if (this.runningInfo.equals(newRunningInfo)) {
            alarmMessage = 


    
"same runingInfo switch again : " + runningInfo.getAddress().toString();
            logger.warn(alarmMessage);
            return;
        }

        if (newRunningInfo == null) {
            alarmMessage = "no standby config, just do nothing, will continue try:"
                           + runningInfo.getAddress().toString();
            logger.warn(alarmMessage);
            sendAlarm(destination, alarmMessage);
            return;
        } else {
            stop();
            alarmMessage = "try to ha switch, old:" + runningInfo.getAddress().toString() + ", new:"
                           + newRunningInfo.getAddress().toString();
            logger.warn(alarmMessage);
            sendAlarm(destination, alarmMessage);
            runningInfo = newRunningInfo;
            start();
        }
    }

    //......

}    
复制代码
  • MysqlEventParser的doSwitch方法先执行stop方法,然后更新runningInfo,最后执行start方法

小结

MysqlDetectingTimeTask继承了TimerTask,其run方法在reconnect为true时,更新reconnect为false,然后执行mysqlConnection.reconnect();若reconnect为false且mysqlConnection.isConnected()为false则执行mysqlConnection.connect();之后执行detectingSQL语句,然后记录costTime,最后执行((HeartBeatCallback) haController).onSuccess(costTime);依次捕获SocketTimeoutException、IOException、Throwable,然后则执行((HeartBeatCallback) haController).onFailed(e),更新reconnect为true

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/62336
 
236 次点击