社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

聊聊nacos的MysqlHealthCheckProcessor

go4it • 6 年前 • 302 次点击  
阅读 13

聊聊nacos的MysqlHealthCheckProcessor

本文主要研究一下nacos的MysqlHealthCheckProcessor

MysqlHealthCheckProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/MysqlHealthCheckProcessor.java

@Component
public class MysqlHealthCheckProcessor implements HealthCheckProcessor {

    @Autowired
    private HealthCheckCommon healthCheckCommon;

    @Autowired
    private SwitchDomain switchDomain;

    public static final int CONNECT_TIMEOUT_MS = 500;

    private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";
    private static final String MYSQL_SLAVE_READONLY = "ON";

    private static ConcurrentMap<String, Connection> CONNECTION_POOL
            = new ConcurrentHashMap<String, Connection>();

    private static ExecutorService EXECUTOR;

    static {

        int processorCount = Runtime.getRuntime().availableProcessors();
        EXECUTOR
                = Executors.newFixedThreadPool(processorCount <= 1 ? 1 : processorCount / 2,
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setDaemon(true);
                        thread.setName("com.nacos.mysql.checker");
                        return thread;
                    }
                }
        );
    }

    public MysqlHealthCheckProcessor() {
    }

    @Override
    public String getType() {
        return "MYSQL";
    }

    @Override
    public void process(HealthCheckTask task) {
        List<Instance> ips = task.getCluster().allIPs(false);

        SRV_LOG.debug("mysql check, ips:" + ips);
        if (CollectionUtils.isEmpty(ips)) {
            return;
        }

        for (Instance ip : ips) {
            try {

                if (ip.isMarked()) {
                    if (SRV_LOG.isDebugEnabled()) {
                        SRV_LOG.debug("mysql check, ip is marked as to skip health check, ip: {}", ip.getIp());
                    }
                    continue;
                }

                if (!ip.markChecking()) {
                    SRV_LOG.warn("mysql check started before last one finished, service: {}:{}:{}",
                        task.getCluster().getService().getName(), task.getCluster().getName(), ip.getIp());

                    healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getMysqlHealthParams());
                    continue;
                }

                EXECUTOR.execute(new MysqlCheckTask(ip, task));
                MetricsMonitor.getMysqlHealthCheckMonitor().incrementAndGet();
            } catch (Exception e) {
                ip.setCheckRT(switchDomain.getMysqlHealthParams().getMax());
                healthCheckCommon.checkFail(ip, task, "mysql:error:" + e.getMessage());
                healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(), task, switchDomain.getMysqlHealthParams());
            }
        }
    }

    //......
}
复制代码
  • MysqlHealthCheckProcessor实现了HealthCheckProcessor接口,其getType方法返回的是MYSQL;其process方法会遍历instances,对于非markChecking的会执行healthCheckCommon.reEvaluateCheckRT,对于marked的直接跳过,对于markChecking的会创建MysqlCheckTask提交给EXECUTOR执行

MysqlCheckTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/MysqlHealthCheckProcessor.java

    private class MysqlCheckTask implements Runnable {
        private Instance ip;
        private HealthCheckTask task;
        private long startTime = System.currentTimeMillis();

        public MysqlCheckTask(Instance ip, HealthCheckTask task) {
            this.ip = ip;
            this.task = task;
        }

        @Override
        public void run() {

            Statement statement = null;
            ResultSet resultSet = null;

            try {

                Cluster cluster = task.getCluster();
                String key = cluster.getService().getName() + ":" + cluster.getName() + ":" + ip.getIp() + ":" + ip.getPort();
                Connection connection = CONNECTION_POOL.get(key);
                AbstractHealthChecker.Mysql config = (AbstractHealthChecker.Mysql) cluster.getHealthChecker();

                if (connection == null || connection.isClosed()) {
                    MysqlDataSource dataSource = new MysqlDataSource();
                    dataSource.setConnectTimeout(CONNECT_TIMEOUT_MS);
                    dataSource.setSocketTimeout(CONNECT_TIMEOUT_MS);
                    dataSource.setUser(config.getUser());
                    dataSource.setPassword(config.getPwd());
                    dataSource.setLoginTimeout(1);

                    dataSource.setServerName(ip.getIp());
                    dataSource.setPort(ip.getPort());

                    connection = dataSource.getConnection();
                    CONNECTION_POOL.put(key, connection);
                }

                statement = connection.createStatement();
                statement.setQueryTimeout(1);

                resultSet = statement.executeQuery(config.getCmd());
                int resultColumnIndex = 2;

                if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {
                    resultSet.next();
                    if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {
                        throw new IllegalStateException("current node is slave!");
                    }
                }

                healthCheckCommon.checkOK(ip, task, "mysql:+ok");
                healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task, switchDomain.getMysqlHealthParams());
            } catch (SQLException e) {
                // fail immediately
                healthCheckCommon.checkFailNow(ip, task, "mysql:" + e.getMessage());
                healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task, switchDomain.getMysqlHealthParams());
            } catch (Throwable t) {
                Throwable cause = t;
                int maxStackDepth = 50;
                for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
                    if


    
 (cause instanceof SocketTimeoutException
                            || cause instanceof ConnectTimeoutException
                            || cause instanceof TimeoutException
                            || cause.getCause() instanceof TimeoutException) {

                        healthCheckCommon.checkFail(ip, task, "mysql:timeout:" + cause.getMessage());
                        healthCheckCommon.reEvaluateCheckRT(task.getCheckRTNormalized() * 2, task, switchDomain.getMysqlHealthParams());
                        return;
                    }

                    cause = cause.getCause();
                }

                // connection error, probably not reachable
                healthCheckCommon.checkFail(ip, task, "mysql:error:" + t.getMessage());
                healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(), task, switchDomain.getMysqlHealthParams());
            } finally {
                ip.setCheckRT(System.currentTimeMillis() - startTime);
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e) {
                        Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close statement:" + statement, e);
                    }
                }
                if (resultSet != null) {
                    try {
                        resultSet.close();
                    } catch (SQLException e) {
                        Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close resultSet:" + resultSet, e);
                    }
                }
            }
        }
    }
复制代码
  • MysqlCheckTask实现了Runnable方法,其run方法会从CONNECTION_POOL获取指定实例的connection,如果connection为null或者是closed的则重新创建MysqlDataSource并getConnection()放入到CONNECTION_POOL;获取到connection之后会执行config.getCmd()指定的查询,然后根据返回结果或异常情况执行healthCheckCommon.checkOK或者healthCheckCommon.checkFailNow、healthCheckCommon.checkFail操作

小结

MysqlHealthCheckProcessor实现了HealthCheckProcessor接口,其getType方法返回的是MYSQL;其process方法会遍历instances,对于非markChecking的会执行healthCheckCommon.reEvaluateCheckRT,对于marked的直接跳过,对于markChecking的会创建MysqlCheckTask提交给EXECUTOR执行

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/45195
 
302 次点击