社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

聊聊canal的MysqlConnection

go4it • 5 年前 • 288 次点击  
阅读 3

聊聊canal的MysqlConnection

本文主要研究一下canal的MysqlConnection

ErosaConnection

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/ErosaConnection.java

public interface ErosaConnection {

    public void connect() throws IOException;

    public void reconnect() throws IOException;

    public void disconnect() throws IOException;

    /**
     * 用于快速数据查找,和dump的区别在于,seek会只给出部分的数据
     */
    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException;

    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException;

    public void dump(long timestamp, SinkFunction func) throws IOException;

    /**
     * 通过GTID同步binlog
     */
    public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException;

    // -------------

    public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException;

    public void dump(long timestamp, MultiStageCoprocessor coprocessor) throws IOException;

    public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException;

    ErosaConnection fork();

    public long queryServerId() throws IOException;
}
复制代码
  • ErosaConnection接口定义了connect、reconnect、disconnect、seek、dump、fork、queryServerId方法

MysqlConnection

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

public class MysqlConnection implements ErosaConnection {

    private static final Logger logger         = LoggerFactory.getLogger(MysqlConnection.class);

    private MysqlConnector      connector;
    private long                slaveId;
    private Charset             charset        = Charset.forName("UTF-8");
    private BinlogFormat        binlogFormat;
    private BinlogImage         binlogImage;

    // tsdb releated
    private AuthenticationInfo  authInfo;
    protected int               connTimeout    = 5 * 1000;                                      // 5秒
    protected int               soTimeout      = 60 * 60 * 1000;                                // 1小时
    private int                 binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
    // dump binlog bytes, 暂不包括meta与TSDB
    private AtomicLong          receivedBinlogBytes;

    public MysqlConnection(){
    }

    public MysqlConnection(InetSocketAddress address, String username, String password){
        authInfo = new AuthenticationInfo();
        authInfo.setAddress(address);
        authInfo.setUsername(username);
        authInfo.setPassword(password);
        connector = new MysqlConnector(address, username, password);
        // 将connection里面的参数透传下
        connector.setSoTimeout(soTimeout);
        connector.setConnTimeout(connTimeout);
    }

    public MysqlConnection(InetSocketAddress address, String username, String password, byte charsetNumber,
                           String defaultSchema){
        authInfo = new AuthenticationInfo();
        authInfo.setAddress(address);
        authInfo.setUsername(username);
        authInfo.setPassword(password);
        authInfo.setDefaultDatabaseName(defaultSchema);
        connector = new MysqlConnector(address, username, password, charsetNumber, defaultSchema);
        // 将connection里面的参数透传下
        connector.setSoTimeout(soTimeout);
        connector.setConnTimeout(connTimeout);
    }

    public void connect() throws IOException {
        connector.connect();
    }

    public void reconnect() throws IOException {
        connector.reconnect();
    }

    public void disconnect() throws IOException {
        connector.disconnect();
    }

    public boolean isConnected() {
        return connector.isConnected();
    }

    public MysqlConnection fork() {
        MysqlConnection connection = new MysqlConnection();
        connection.setCharset(getCharset());
        connection.setSlaveId(getSlaveId());
        connection.setConnector(connector.fork());
        // set authInfo
        connection.setAuthInfo(authInfo);
        return connection;
    }

    @Override
    public long queryServerId() throws IOException {
        ResultSetPacket resultSetPacket = query("show variables like 'server_id'");
        List<String> fieldValues = resultSetPacket.getFieldValues();
        if (fieldValues == null || fieldValues.size() != 2) {
            return 0;
        }
        return NumberUtils.toLong(fieldValues.get(1));
    }

    public ResultSetPacket query(String cmd) throws IOException {
        MysqlQueryExecutor exector = new MysqlQueryExecutor(connector);
        return exector.query(cmd);
    }

    //......

}
复制代码
  • MysqlConnection实现了ErosaConnection接口,其构造器会构建AuthenticationInfo及MysqlConnector;其connect、reconnect、disconnect方法均直接委托给MysqlConnector;其fork方法会使用connector.fork()重新创建一个MysqlConnection;其queryServerId方法则使用show variables like 'server_id'查询

seek

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

public class MysqlConnection implements ErosaConnection {

	//......

    public void seek(String binlogfilename, Long binlogPosition, String gtid, SinkFunction func) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendBinlogDump(binlogfilename, binlogPosition);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        fetcher.start(connector.getChannel());
        LogDecoder decoder = new LogDecoder();
        decoder.handle(LogEvent.ROTATE_EVENT);
        decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
        decoder.handle(LogEvent.QUERY_EVENT);
        decoder.handle(LogEvent.XID_EVENT);
        LogContext context = new LogContext();
        // 若entry position存在gtid,则使用传入的gtid作为gtidSet
        // 拼接的标准,否则同时开启gtid和tsdb时,会导致丢失gtid
        // 而当源端数据库gtid 有purged时会有如下类似报错
        // 


    
'errno = 1236, sqlstate = HY000 errmsg = The slave is connecting
        // using CHANGE MASTER TO MASTER_AUTO_POSITION = 1 ...
        if (StringUtils.isNotEmpty(gtid)) {
            decoder.handle(LogEvent.GTID_LOG_EVENT);
            context.setGtidSet(MysqlGTIDSet.parse(gtid));
        }
        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
        while (fetcher.fetch()) {
            accumulateReceivedBytes(fetcher.limit());
            LogEvent event = null;
            event = decoder.decode(fetcher, context);

            if (event == null) {
                throw new CanalParseException("parse failed");
            }

            if (!func.sink(event)) {
                break;
            }
        }
    }

    private void updateSettings() throws IOException {
        try {
            update("set wait_timeout=9999999");
        } catch (Exception e) {
            logger.warn("update wait_timeout failed", e);
        }
        try {
            update("set net_write_timeout=1800");
        } catch (Exception e) {
            logger.warn("update net_write_timeout failed", e);
        }

        try {
            update("set net_read_timeout=1800");
        } catch (Exception e) {
            logger.warn("update net_read_timeout failed", e);
        }

        try {
            // 设置服务端返回结果时不做编码转化,直接按照数据库的二进制编码进行发送,由客户端自己根据需求进行编码转化
            update("set names 'binary'");
        } catch (Exception e) {
            logger.warn("update names failed", e);
        }

        try {
            // mysql5.6针对checksum支持需要设置session变量
            // 如果不设置会出现错误: Slave can not handle replication events with the
            // checksum that master is configured to log
            // 但也不能乱设置,需要和mysql server的checksum配置一致,不然RotateLogEvent会出现乱码
            // '@@global.binlog_checksum'需要去掉单引号,在mysql 5.6.29下导致master退出
            update("set @master_binlog_checksum= @@global.binlog_checksum");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update master_binlog_checksum failed", e);
            }
        }

        try {
            // 参考:https://github.com/alibaba/canal/issues/284
            // mysql5.6需要设置slave_uuid避免被server kill链接
            update("set @slave_uuid=uuid()");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update slave_uuid failed", e);
            }
        }

        try {
            // mariadb针对特殊的类型,需要设置session变量
            update("SET @mariadb_slave_capability='" + LogEvent.MARIA_SLAVE_CAPABILITY_MINE + "'");
        } catch (Exception e) {
            if (!StringUtils.contains(e.getMessage(), "Unknown system variable")) {
                logger.warn("update mariadb_slave_capability failed", e);
            }
        }

        /**
         * MASTER_HEARTBEAT_PERIOD sets the interval in seconds between
         * replication heartbeats. Whenever the master's binary log is updated
         * with an event, the waiting period for the next heartbeat is reset.
         * interval is a decimal value having the range 0 to 4294967 seconds and
         * a resolution in milliseconds; the smallest nonzero value is 0.001.
         * Heartbeats are sent by the master only if there are no unsent events
         * in the binary log file for a period longer than interval.
         */
        try {
            long periodNano = TimeUnit.SECONDS.toNanos(MASTER_HEARTBEAT_PERIOD_SECONDS);
            update("SET @master_heartbeat_period=" + periodNano);
        } catch (Exception e) {
            logger.warn("update master_heartbeat_period failed", e);
        }
    }

    private void loadBinlogChecksum() {
        ResultSetPacket rs = null;
        try {
            rs = query("select @@global.binlog_checksum");
            List<String> columnValues = rs.getFieldValues();
            if (columnValues != null && columnValues.size() >= 1 && columnValues.get(0) != null
                && columnValues.get(0).toUpperCase().equals("CRC32")) {
                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_CRC32;
            } else {
                binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
            }
        } catch (Throwable e) {
            // logger.error("", e);
            binlogChecksum = LogEvent.BINLOG_CHECKSUM_ALG_OFF;
        }
    }

    private void sendBinlogDump(String binlogfilename, Long binlogPosition) throws IOException {
        BinlogDumpCommandPacket binlogDumpCmd = new BinlogDumpCommandPacket();
        binlogDumpCmd.binlogFileName = binlogfilename;
        binlogDumpCmd.binlogPosition = binlogPosition;
        binlogDumpCmd.slaveServerId = this.slaveId;
        byte[] cmdBody = binlogDumpCmd.toBytes();

        logger.info("COM_BINLOG_DUMP with position:{}", binlogDumpCmd);
        HeaderPacket binlogDumpHeader = new HeaderPacket();
        binlogDumpHeader.setPacketBodyLength(cmdBody.length);
        binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
        connector.setDumping(true);
    }

    //......
}
复制代码
  • seek方法先执行updateSettings、loadBinlogChecksum、sendBinlogDump方法,然后创建DirectLogFetcher来fetch数据;updateSettings方法会设置wait_timeout、net_write_timeout、net_read_timeout、master_heartbeat_period等参数;loadBinlogChecksum方法会校验master的binlogChecksum;sendBinlogDump方法则是发送BinlogDumpCommandPacket

dump

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java

public class MysqlConnection implements ErosaConnection {

	//......

    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendRegisterSlave();
        sendBinlogDump(binlogfilename, binlogPosition);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        fetcher.start(connector.getChannel());
        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
        LogContext context = new LogContext();
        context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
        while (fetcher.fetch()) {
            accumulateReceivedBytes(fetcher.limit());
            LogEvent event = null;
            event = decoder.decode(fetcher, context);

            if (event == null) {
                throw new CanalParseException("parse failed");
            }

            if (!func.sink(event)) {
                break;
            }

            if (event.getSemival() == 1) {
                sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
            }
        }
    }

    public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendBinlogDumpGTID(gtidSet);

        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        try {
            fetcher.start(connector.getChannel());
            LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
            LogContext context = new LogContext();
            context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
            // fix bug: #890 将gtid传输至context中,供decode使用
            context.setGtidSet(gtidSet);
            while (fetcher.fetch()) {
                accumulateReceivedBytes(fetcher.limit());
                LogEvent event = null;
                event = decoder.decode(fetcher, context);

                if (event == null) {
                    throw new CanalParseException("parse failed");
                }

                if (!func.sink(event)) {
                    break;
                }
            }
        } finally {
            fetcher.close();
        }
    }

    public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendRegisterSlave();
        sendBinlogDump(binlogfilename, binlogPosition);
        ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        try {
            fetcher.start(connector.getChannel());
            while (fetcher.fetch()) {
                accumulateReceivedBytes(fetcher.limit());
                LogBuffer buffer = fetcher.duplicate();
                fetcher.consume(fetcher.limit());
                if (!coprocessor.publish(buffer)) {
                    break;
                }
            }
        } finally {
            fetcher.close();
        }
    }

    public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOException {
        updateSettings();
        loadBinlogChecksum();
        sendBinlogDumpGTID(gtidSet);
        ((MysqlMultiStageCoprocessor) coprocessor).setConnection(this);
        ((MysqlMultiStageCoprocessor) coprocessor).setBinlogChecksum(binlogChecksum);
        DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
        try {
            fetcher.start(connector.getChannel());
            while (fetcher.fetch()) {
                accumulateReceivedBytes(fetcher.limit());
                LogBuffer buffer = fetcher.duplicate();
                fetcher.consume(fetcher.limit());
                if


    
 (!coprocessor.publish(buffer)) {
                    break;
                }
            }
        } finally {
            fetcher.close();
        }
    }

    private void sendRegisterSlave() throws IOException {
        RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();
        SocketAddress socketAddress = connector.getChannel().getLocalSocketAddress();
        if (socketAddress == null || !(socketAddress instanceof InetSocketAddress)) {
            return;
        }

        InetSocketAddress address = (InetSocketAddress) socketAddress;
        String host = address.getHostString();
        int port = address.getPort();
        cmd.reportHost = host;
        cmd.reportPort = port;
        cmd.reportPasswd = authInfo.getPassword();
        cmd.reportUser = authInfo.getUsername();
        cmd.serverId = this.slaveId;
        byte[] cmdBody = cmd.toBytes();

        logger.info("Register slave {}", cmd);

        HeaderPacket header = new HeaderPacket();
        header.setPacketBodyLength(cmdBody.length);
        header.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);

        header = PacketManager.readHeader(connector.getChannel(), 4);
        byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength());
        assert body != null;
        if (body[0] < 0) {
            if (body[0] == -1) {
                ErrorPacket err = new ErrorPacket();
                err.fromBytes(body);
                throw new IOException("Error When doing Register slave:" + err.toString());
            } else {
                throw new IOException("unpexpected packet with field_count=" + body[0]);
            }
        }
    }

    private void sendBinlogDumpGTID(GTIDSet gtidSet) throws IOException {
        BinlogDumpGTIDCommandPacket binlogDumpCmd = new BinlogDumpGTIDCommandPacket();
        binlogDumpCmd.slaveServerId = this.slaveId;
        binlogDumpCmd.gtidSet = gtidSet;
        byte[] cmdBody = binlogDumpCmd.toBytes();

        logger.info("COM_BINLOG_DUMP_GTID:{}", binlogDumpCmd);
        HeaderPacket binlogDumpHeader = new HeaderPacket();
        binlogDumpHeader.setPacketBodyLength(cmdBody.length);
        binlogDumpHeader.setPacketSequenceNumber((byte) 0x00);
        PacketManager.writePkg(connector.getChannel(), binlogDumpHeader.toBytes(), cmdBody);
        connector.setDumping(true);
    }

	//......

}
复制代码
  • MysqlConnection提供了多个dump方法,主要分为根据binlog文件名的,根据gtidSet的,以及使用SinkFunction或者MultiStageCoprocessor的
  • 根据binlogfilename的dump方法先执行updateSettings、loadBinlogChecksum、sendRegisterSlave、sendBinlogDump,然后创建DirectLogFetcher拉取数据,触发SinkFunction或者MultiStageCoprocessor;根据gtidSet的dump方法先执行updateSettings、loadBinlogChecksum、sendBinlogDumpGTID,然后创建DirectLogFetcher拉取数据,触发SinkFunction或者MultiStageCoprocessor
  • sendRegisterSlave方法构造并发送RegisterSlaveCommandPacket;sendBinlogDumpGTID方法则构造并发送BinlogDumpGTIDCommandPacket

小结

MysqlConnection实现了ErosaConnection接口,其构造器会构建AuthenticationInfo及MysqlConnector;其connect、reconnect、disconnect方法均直接委托给MysqlConnector;其fork方法会使用connector.fork()重新创建一个MysqlConnection;其queryServerId方法则使用show variables like 'server_id'查询;它提供了多个dump方法,主要分为根据binlog文件名的,根据gtidSet的,以及使用SinkFunction或者MultiStageCoprocessor的

doc

?time=1588053738.64
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/62323
 
288 次点击