社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

聊聊elasticsearch的NodesFaultDetection

go4it • 4 年前 • 208 次点击  
阅读 26

聊聊elasticsearch的NodesFaultDetection

本文主要研究一下elasticsearch的NodesFaultDetection

NodesFaultDetection

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

public class NodesFaultDetection extends AbstractComponent {

    public static interface Listener {

        void onNodeFailure(DiscoveryNode node, String reason);
    }

    private final ThreadPool threadPool;

    private final TransportService transportService;


    private final boolean connectOnNetworkDisconnect;

    private final TimeValue pingInterval;

    private final TimeValue pingRetryTimeout;

    private final int pingRetryCount;

    // used mainly for testing, should always be true
    private final boolean registerConnectionListener;


    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();

    private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();

    private final FDConnectionListener connectionListener;

    private volatile DiscoveryNodes latestNodes = EMPTY_NODES;

    private volatile boolean running = false;

    public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {
        super(settings);
        this.threadPool = threadPool;
        this.transportService = transportService;

        this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);
        this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
        this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
        this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
        this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);

        logger.debug("[node  ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);

        transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler());

        this.connectionListener = new FDConnectionListener();
        if (registerConnectionListener) {
            transportService.addConnectionListener(connectionListener);
        }
    }

    public NodesFaultDetection start() {
        if (running) {
            return this;
        }
        running = true;
        return this;
    }

    public NodesFaultDetection stop() {
        if (!running) {
            return this;
        }
        running = false;
        return this;
    }

    public void close() {
        stop();
        transportService.removeHandler(PingRequestHandler.ACTION);
        transportService.removeConnectionListener(connectionListener);
    }

    //......
}
复制代码
  • NodesFaultDetection继承了AbstractComponent,它定义了一个CopyOnWriteArrayList类型的listeners,一个ConcurrentMap的nodesFD,connectionListener、latestNodes、running等属性
  • 其构造器读取connect_on_network_disconnect(默认true)、ping_interval(默认1s)、ping_timeout(默认30s)、ping_retries(默认为3)、register_connection_listener(默认true)配置,然后给transportService注册了PingRequestHandler.ACTION的PingRequestHandler,添加了FDConnectionListener
  • start方法用于设置running为true;stop用于设置running为false;close方法先执行stop,然后从transportService移除PingRequestHandler.ACTION的handler,并移除connectionListener

PingRequestHandler

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

    class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {

        public static final String ACTION = "discovery/zen/fd/ping";

        @Override
        public PingRequest newInstance() {
            return new PingRequest();
        }

        @Override
        public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
            // if we are not the node we are supposed to be pinged, send an exception
            // this can happen when a kill -9 is sent, and another node is started using the same port
            if (!latestNodes.localNodeId().equals(request.nodeId)) {
                throw new ElasticSearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");
            }
            channel.sendResponse(new PingResponse());
        }

        @Override
        public String executor() {
            return ThreadPool.Names.SAME;
        }
    }

    static class PingRequest extends TransportRequest {

        // the (assumed) node id we are pinging
        private String nodeId;

        PingRequest() {
        }

        PingRequest(String nodeId) {
            this.nodeId = nodeId;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            nodeId = in.readString();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(nodeId);
        }
    }

    private static class PingResponse extends TransportResponse {

        private PingResponse() {
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
        }
    }
复制代码
  • PingRequestHandler的newInstance方法用于创建PingRequest,该对象定义了nodeId属性用于标识它要请求的目标nodeId;而messageReceived方法用于响应PingRequest请求,它会先判断目标nodeId是否跟localNodeId一致,一致的话则返回PingResponse

FDConnectionListener

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java




    
    private class FDConnectionListener implements TransportConnectionListener {
        @Override
        public void onNodeConnected(DiscoveryNode node) {
        }

        @Override
        public void onNodeDisconnected(DiscoveryNode node) {
            handleTransportDisconnect(node);
        }
    }

    private void handleTransportDisconnect(DiscoveryNode node) {
        if (!latestNodes.nodeExists(node.id())) {
            return;
        }
        NodeFD nodeFD = nodesFD.remove(node);
        if (nodeFD == null) {
            return;
        }
        if (!running) {
            return;
        }
        nodeFD.running = false;
        if (connectOnNetworkDisconnect) {
            try {
                transportService.connectToNode(node);
                nodesFD.put(node, new NodeFD());
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node));
            } catch (Exception e) {
                logger.trace("[node  ] [{}] transport disconnected (with verified connect)", node);
                notifyNodeFailure(node, "transport disconnected (with verified connect)");
            }
        } else {
            logger.trace("[node  ] [{}] transport disconnected", node);
            notifyNodeFailure(node, "transport disconnected");
        }
    }

    private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
        threadPool.generic().execute(new Runnable() {
            @Override
            public void run() {
                for (Listener listener : listeners) {
                    listener.onNodeFailure(node, reason);
                }
            }
        });
    }
复制代码
  • FDConnectionListener在onNodeDisconnected的时候会执行handleTransportDisconnect;该方法会将该node从nodesFD中移除,标记该nodeFD的running为false
  • 如果connectOnNetworkDisconnect为true则对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行;如果connect异常或者connectOnNetworkDisconnect为false,否执行notifyNodeFailure方法
  • notifyNodeFailure方法则会触发NodesFaultDetection.Listener.onNodeFailure回调,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法

ZenDiscovery

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
	//......

    @Inject
    public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
                        TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
                        DiscoveryNodeService discoveryNodeService, ZenPingService pingService) {
        super(settings);
        this.clusterName = clusterName;
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.transportService = transportService;
        this.discoveryNodeService = discoveryNodeService;
        this.pingService = pingService;

        // also support direct discovery.zen settings, for cases when it gets extended
        this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
        this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);

        this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
        this.masterElectionFilterDataNodes = settings.getAsBoolean("discovery.zen.master_election.filter_data", false);

        logger.debug("using ping.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);

        this.electMaster = new ElectMasterService(settings);
        nodeSettingsService.addListener(new ApplySettings());

        this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
        this.masterFD.addListener(new MasterNodeFailureListener());

        this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
        this.nodesFD.addListener(new NodeFailureListener());

        this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
        this.pingService.setNodesProvider(this);
        this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());

        transportService.registerHandler(RejoinClusterRequestHandler.ACTION, new RejoinClusterRequestHandler());
    }

    protected void doStart() throws ElasticSearchException {
        Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
        // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
        String nodeId = UUID.randomBase64UUID();
        localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);
        latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
        nodesFD.updateNodes(latestDiscoNodes);
        pingService.start();

        // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
        asyncJoinCluster();
    }

    public void publish(ClusterState clusterState) {
        if (!master) {
            throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");
        }
        latestDiscoNodes = clusterState.nodes();
        nodesFD.updateNodes(clusterState.nodes());
        publishClusterState.publish(clusterState);
    }

    private class NodeFailureListener implements NodesFaultDetection.Listener {

        @Override
        public void onNodeFailure(DiscoveryNode node, String reason) {
            handleNodeFailure(node, reason);
        }
    }

    private void handleNodeFailure(final DiscoveryNode node, String reason) {
        if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a node failure
            return;
        }
        if (!master) {
            // nothing to do here...
            return;
        }
        clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() {
            @Override
            public ClusterState execute(ClusterState currentState) {
                DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
                        .putAll(currentState.nodes())
                        .remove(node.id());
                latestDiscoNodes = builder.build();
                currentState = newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
                // check if we have enough master nodes, if not, we need to move into joining the cluster again
                if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
                    return rejoin(currentState, "not enough master nodes");
                }
                // eagerly run reroute to remove dead nodes from routing table
                RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
                return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
            }

            @Override
            public void clusterStateProcessed(ClusterState clusterState) {
                sendInitialStateEventIfNeeded();
            }
        });
    }

	//......
}
复制代码
  • ZenDiscovery的构造器创建了NodesFaultDetection,并给它添加了NodeFailureListener;该listener实现了NodesFaultDetection.Listener接口,其onNodeFailure回调执行的是handleNodeFailure方法,它会执行ProcessedClusterStateUpdateTask,将该node从currentState.nodes()中移除,然后判断masterNode数量是否满足minimumMasterNodes,不够的话会执行rejoin方法,够的话则执行allocationService.reroute
  • 其doStart方法会根据配置文件的node配置创建localNode,然后加入到latestDiscoNodes中,之后执行nodesFD.updateNodes(latestDiscoNodes)方法,然后执行pingService.start()及asyncJoinCluster()
  • 其publish方法则根据clusterState的nodes来更新本地的latestDiscoNodes,然后执行nodesFD.updateNodes(latestDiscoNodes)方法,最后执行publishClusterState.publish(clusterState)

NodesFaultDetection.updateNodes

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

public class NodesFaultDetection extends AbstractComponent {
	//......

    public void updateNodes(DiscoveryNodes nodes) {
        DiscoveryNodes prevNodes = latestNodes;
        this.latestNodes = nodes;
        if (!running) {
            return;
        }
        DiscoveryNodes.Delta delta = nodes.delta(prevNodes);
        for (DiscoveryNode newNode : delta.addedNodes()) {
            if (newNode.id().equals(nodes.localNodeId())) {
                // no need to monitor the local node
                continue;
            }
            if (!nodesFD.containsKey(newNode)) {
                nodesFD.put(newNode, new NodeFD());
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
            }
        }
        for (DiscoveryNode removedNode : delta.removedNodes()) {
            nodesFD.remove(removedNode);
        }
    }

	//......
}
复制代码
  • NodesFaultDetection提供了updateNodes方法用于更新自身的latestNodes,该方法调用了nodes.delta(prevNodes)来计算DiscoveryNodes.Delta,它的addedNodes方法返回新增的node,而emovedNodes()方法返回删除的node
  • 对于newNode先判断是否在nodesFD,如果不在的话,则会添加到nodesFD中,并注册一个SendPingRequest的延时任务,延时pingInterval执行
  • 对于removedNode则将其从nodesFD中移除;handleTransportDisconnect方法也会将一个disconnect的node从ndoesFD中移除,如果重试一次成功则会再次放入nodesFD中

SendPingRequest

elasticsearch-0.90.0/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

    private class SendPingRequest implements Runnable {

        private final DiscoveryNode node;

        private SendPingRequest(DiscoveryNode node) {
            this.node = node;
        }

        @Override
        public void run() {
            if (!running) {
                return;
            }
            transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
                    new BaseTransportResponseHandler<PingResponse>() {
                        @Override
                        public PingResponse newInstance() {
                            return new PingResponse();
                        }

                        @Override
                        public void handleResponse(PingResponse response) {
                            if (!running) {
                                return;
                            }
                            NodeFD nodeFD = nodesFD.get(node);
                            if (nodeFD != null) {
                                if (!nodeFD.running) {
                                    return;
                                }
                                nodeFD.retryCount = 0;
                                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, SendPingRequest.this);
                            }
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            // check if the master node did not get switched on us...
                            if (!running) {
                                return;
                            }
                            if (exp instanceof ConnectTransportException) {
                                // ignore this one, we already handle it by registering a connection listener
                                return;
                            }
                            NodeFD nodeFD = nodesFD.get(node);
                            if (nodeFD != null) {
                                if (!nodeFD.running) {
                                    return;
                                }
                                int retryCount = ++nodeFD.retryCount;
                                logger.trace("[node  ] failed to ping [{}], retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount);
                                if (retryCount >= pingRetryCount) {
                                    logger.debug("[node  ] failed to ping [{}], tried [{}] times, each with  maximum [{}] timeout", node, pingRetryCount, pingRetryTimeout);
                                    // not good, failure
                                    if (nodesFD.remove(node) != null) {
                                        notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
                                    }
                                } else {
                                    // resend the request, not reschedule, rely on send timeout
                                    transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
                                            options().withHighType().withTimeout(pingRetryTimeout), this);
                                }
                            }
                        }

                        @Override
                        public String executor() {
                            return ThreadPool.Names.SAME;
                        }
                    });
        }
    }
复制代码
  • SendPingRequest方法会往目标node发送PingRequest,其超时时间为pingRetryTimeout;其handleResponse方法会判断该node是否在nodesFD中,如果已经被移除了则忽略,如果改nodeFD的running为false,也忽略,否则重置其retryCount,并重新注册SendPingRequest的延时任务,延时pingInterval执行
  • 如果请求出现TransportException则判断是否是ConnectTransportException,如果是则忽略,因为该异常已经由往transportService注册的FDConnectionListener的onNodeDisconnected来处理
  • 如果是其他异常则增加nodeFD.retryCount,当retryCount大于等于配置的pingRetryCount时,则会将该node从nodesFD中移除,并回调notifyNodeFailure方法,具体就是回调了ZenDiscovery的handleNodeFailure方法;如果没有超过配置的pingRetryCount则会进行重试,重新发送PingRequest请求

小结

  • NodesFaultDetection给transportService注册了PingRequestHandler.ACTION的PingRequestHandler,添加了FDConnectionListener;PingRequestHandler用于响应PingRequest请求,返回PingResponse;FDConnectionListener则用于处理ConnectTransportException异常
  • FDConnectionListener的onNodeDisconnected方法会将该node从nodesFD中移除,标记该nodeFD的running为false;如果connectOnNetworkDisconnect为true则会重试一次(对该node进行connect,成功则放入nodesFD,并注册对该node进行SendPingRequest的延时任务,延时pingInterval执行);如果connect异常或者connectOnNetworkDisconnect为false,否执行notifyNodeFailure方法;notifyNodeFailure方法则会触发NodesFaultDetection.Listener.onNodeFailure回调,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法
  • ZenDiscovery的doStart方法及publish方法都会执行NodesFaultDetection的updateNodes方法来更新latestNodes,对于新的node则注册延时任务SendPingRequest
  • SendPingRequest执行成功时会重置retryCount并继续注册SendPingRequest的延时任务,如果是非TransportException则进行重试,重试次数超过限制则触发notifyNodeFailure,回调NodesFaultDetection.Listener.onNodeFailure方法,这里回调ZenDiscovery的NodeFailureListener的onNodeFailure方法
  • ZenDiscovery的NodeFailureListener实现了NodesFaultDetection.Listener接口,其onNodeFailure回调执行的是handleNodeFailure方法,它会执行ProcessedClusterStateUpdateTask,将该node从currentState.nodes()中移除,然后判断masterNode数量是否满足minimumMasterNodes,不够的话会执行rejoin方法,够的话则执行allocationService.reroute

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/32783
 
208 次点击