社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

聊聊elasticsearch的ZenPing

go4it • 6 年前 • 172 次点击  
阅读 47

聊聊elasticsearch的ZenPing

本文主要研究一下elasticsearch的ZenPing

ZenPing

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

public interface ZenPing extends Releasable {

    void start();

    void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);

    class PingResponse implements Writeable {
    	//......
    }

    class PingCollection {
    	//......
    }
}
复制代码
  • ZenPing接口继承了Releasable接口,另外它还定义了start、ping方法;除此之外还定义了PingResponse、PingCollection这两个类

PingResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

    class PingResponse implements Writeable {

        /**
         * An ID of a ping response that was generated on behalf of another node. Needs to be less than all other ping IDs so that fake ping
         * responses don't override real ones.
         */
        public static long FAKE_PING_ID = -1;

        private static final AtomicLong idGenerator = new AtomicLong();

        // an always increasing unique identifier for this ping response.
        // lower values means older pings.
        private final long id;

        private final ClusterName clusterName;

        private final DiscoveryNode node;

        private final DiscoveryNode master;

        private final long clusterStateVersion;

        /**
         * @param node                the node which this ping describes
         * @param master              the current master of the node
         * @param clusterName         the cluster name of the node
         * @param clusterStateVersion the current cluster state version of that node
         *                            ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
         */
        public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
            this(idGenerator.incrementAndGet(), node, master, clusterName, clusterStateVersion);
        }

        /**
         * @param id                  the ping's ID
         * @param node                the node which this ping describes
         * @param master              the current master of the node
         * @param clusterName         the cluster name of the node
         * @param clusterStateVersion the current cluster state version of that node
*                            ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION} for not recovered)
         */
        public PingResponse(long id, DiscoveryNode node, DiscoveryNode master, ClusterName clusterName, long clusterStateVersion) {
            this.id = id;
            this.node = node;
            this.master = master;
            this.clusterName = clusterName;
            this.clusterStateVersion = clusterStateVersion;
        }

        public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterState state) {
            this(node, master, state.getClusterName(),
                state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) ?
                    ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version());
        }

        PingResponse(StreamInput in) throws IOException {
            this.clusterName = new ClusterName(in);
            this.node = new DiscoveryNode(in);
            this.master = in.readOptionalWriteable(DiscoveryNode::new);
            this.clusterStateVersion = in.readLong();
            this.id = in.readLong();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            clusterName.writeTo(out);
            node.writeTo(out);
            out.writeOptionalWriteable(master);
            out.writeLong(clusterStateVersion);
            out.writeLong(id);
        }

        /**
         * an always increasing unique identifier for this ping response.
         * lower values means older pings.
         */
        public long id() {
            return this.id;
        }

        /**
         * the name of the cluster this node belongs to
         */
        public ClusterName clusterName() {
            return this.clusterName;
        }

        /** the node which this ping describes */
        public DiscoveryNode node() {
            return node;
        }

        /** the current master of the node */
        public DiscoveryNode master() {
            return master;
        }

        /**
         * the current cluster state version of that node ({@link ElectMasterService.MasterCandidate#UNRECOVERED_CLUSTER_VERSION}
         * for not recovered) */
        public long getClusterStateVersion() {
            return clusterStateVersion;
        }

        @Override
        public String toString() {
            return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "]," +
                   "cluster_state_version [" + clusterStateVersion + "], cluster_name[" + clusterName.value() + "]}";
        }
    }
复制代码
  • PingResponse实现了Writeable接口,其writeTo方法会依次写入clusterName、node、master、clusterStateVersion、id

PingCollection

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

    class PingCollection {

        Map<DiscoveryNode, PingResponse> pings;

        public PingCollection() {
            pings = new HashMap<>();
        }

        /**
         * adds a ping if newer than previous pings from the same node
         *
         * @return true if added, false o.w.
         */
        public synchronized boolean addPing(PingResponse ping) {
            PingResponse existingResponse = pings.get(ping.node());
            // in case both existing and new ping have the same id (probably because they come
            // from nodes from version <1.4.0) we prefer to use the last added one.
            if (existingResponse == null || existingResponse.id() <= ping.id()) {
                pings.put(ping.node(), ping);
                return true;
            }
            return


    
 false;
        }

        /** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */
        public synchronized List<PingResponse> toList() {
            return new ArrayList<>(pings.values());
        }

        /** the number of nodes for which there are known pings */
        public synchronized int size() {
            return pings.size();
        }
    }
复制代码
  • PingCollection定义了DiscoveryNode与PingResponse的map,并提供了addPing方法来添加pingResponse

UnicastZenPing

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

public class UnicastZenPing implements ZenPing {

    private static final Logger logger = LogManager.getLogger(UnicastZenPing.class);

    public static final String ACTION_NAME = "internal:discovery/zen/unicast";

    private final ThreadPool threadPool;
    private final TransportService transportService;
    private final ClusterName clusterName;

    private final PingContextProvider contextProvider;

    private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();

    private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap();

    // a list of temporal responses a node will return for a request (holds responses from other nodes)
    private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();

    private final SeedHostsProvider hostsProvider;

    protected final EsThreadPoolExecutor unicastZenPingExecutorService;

    private final TimeValue resolveTimeout;

    private final String nodeName;

    private volatile boolean closed = false;

    public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
                          SeedHostsProvider seedHostsProvider, PingContextProvider contextProvider) {
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        this.hostsProvider = seedHostsProvider;
        this.contextProvider = contextProvider;

        final int concurrentConnects = SeedHostsResolver.getMaxConcurrentResolvers(settings);
        resolveTimeout = SeedHostsResolver.getResolveTimeout(settings);
        nodeName = Node.NODE_NAME_SETTING.get(settings);
        logger.debug(
            "using max_concurrent_resolvers [{}], resolver timeout [{}]",
            concurrentConnects,
            resolveTimeout);

        transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SAME, UnicastPingRequest::new,
            new UnicastPingRequestHandler());

        final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
        unicastZenPingExecutorService = EsExecutors.newScaling(
                nodeName + "/" + "unicast_connect",
                0,
                concurrentConnects,
                60,
                TimeUnit.SECONDS,
                threadFactory,
                threadPool.getThreadContext());
    }

    private SeedHostsProvider.HostsResolver createHostsResolver() {
        return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
            limitPortCounts, transportService, resolveTimeout);
    }

    @Override
    public void close() {
        ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS);
        Releasables.close(activePingingRounds.values());
        closed = true;
    }

    @Override
    public void start() {
    }

    /**
     * Clears the list of cached ping responses.
     */
    public void clearTemporalResponses() {
        temporalResponses.clear();
    }

    /**
     * Sends three rounds of pings notifying the specified {@link Consumer} when pinging is complete. Pings are sent after resolving
     * configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch
     * of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}.
     * The pings that are sent carry a timeout of 1.25 times the specified {@link TimeValue}. When pinging each node, a connection and
     * handshake is performed, with a connection timeout of the specified {@link TimeValue}.
     *
     * @param resultsConsumer the callback when pinging is complete
     * @param duration        the timeout for various components of the pings
     */
    @Override
    public void ping(final Consumer<PingCollection> resultsConsumer, final TimeValue duration) {
        ping(resultsConsumer, duration, duration);
    }

    protected void ping(final Consumer<PingCollection> resultsConsumer,
                        final TimeValue scheduleDuration,
                        final TimeValue requestDuration) {
        final List<TransportAddress> seedAddresses = new ArrayList<>();
        seedAddresses.addAll(hostsProvider.getSeedAddresses(createHostsResolver()));
        final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
        // add all possible master nodes that were active in the last known cluster configuration
        for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
            seedAddresses.add(masterNode.value.getAddress());
        }

        final ConnectionProfile connectionProfile =
            ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
        final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
            nodes.getLocalNode(), connectionProfile);
        activePingingRounds.put(pingingRound.id(), pingingRound);
        final AbstractRunnable pingSender = new AbstractRunnable() {
            @Override
            public void onFailure(Exception e) {
                if (e instanceof AlreadyClosedException == false) {
                    logger.warn("unexpected error while pinging", e);
                }
            }

            @Override
            protected void doRun() throws Exception {
                sendPings(requestDuration, pingingRound);
            }
        };
        threadPool.generic().execute(pingSender);
        threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC);
        threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC);
        threadPool.schedule(new AbstractRunnable() {
            @Override
            protected void doRun() throws Exception {
                finishPingingRound(pingingRound);
            }

            @Override
            public void onFailure(Exception e) {
                logger.warn("unexpected error while finishing pinging round", e);
            }
        }, scheduleDuration, ThreadPool.Names.GENERIC);
    }

    protected void sendPings(final TimeValue timeout, final PingingRound pingingRound) {
        final ClusterState lastState = contextProvider.clusterState();
        final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState));

        List<TransportAddress> temporalAddresses = temporalResponses.stream().map(pingResponse -> {
            assert clusterName.equals(pingResponse.clusterName()) :
                "got a ping request from a different cluster. expected " + clusterName + " got " + pingResponse.clusterName();
            return pingResponse.node().getAddress();
        }).collect(Collectors.toList());

        final Stream<TransportAddress> uniqueAddresses = Stream.concat(pingingRound.getSeedAddresses().stream(),
            temporalAddresses.stream()).distinct();

        // resolve what we can via the latest cluster state
        final Set<DiscoveryNode> nodesToPing = uniqueAddresses
            .map(address -> {
                DiscoveryNode foundNode = lastState.nodes().findByAddress(address);
                if


    
 (foundNode != null && transportService.nodeConnected(foundNode)) {
                    return foundNode;
                } else {
                    return new DiscoveryNode(
                        address.toString(),
                        address,
                        emptyMap(),
                        emptySet(),
                        Version.CURRENT.minimumCompatibilityVersion());
                }
            }).collect(Collectors.toSet());

        nodesToPing.forEach(node -> sendPingRequestToNode(node, timeout, pingingRound, pingRequest));
    }

    private void sendPingRequestToNode(final DiscoveryNode node, TimeValue timeout, final PingingRound pingingRound,
                                       final UnicastPingRequest pingRequest) {
        submitToExecutor(new AbstractRunnable() {
            @Override
            protected void doRun() throws Exception {
                Connection connection = null;
                if (transportService.nodeConnected(node)) {
                    try {
                        // concurrency can still cause disconnects
                        connection = transportService.getConnection(node);
                    } catch (NodeNotConnectedException e) {
                        logger.trace("[{}] node [{}] just disconnected, will create a temp connection", pingingRound.id(), node);
                    }
                }

                if (connection == null) {
                    connection = pingingRound.getOrConnect(node);
                }

                logger.trace("[{}] sending to {}", pingingRound.id(), node);
                transportService.sendRequest(connection, ACTION_NAME, pingRequest,
                    TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(),
                    getPingResponseHandler(pingingRound, node));
            }

            @Override
            public void onFailure(Exception e) {
                if (e instanceof ConnectTransportException || e instanceof AlreadyClosedException) {
                    // can't connect to the node - this is more common path!
                    logger.trace(() -> new ParameterizedMessage("[{}] failed to ping {}", pingingRound.id(), node), e);
                } else if (e instanceof RemoteTransportException) {
                    // something went wrong on the other side
                    logger.debug(() -> new ParameterizedMessage(
                            "[{}] received a remote error as a response to ping {}", pingingRound.id(), node), e);
                } else {
                    logger.warn(() -> new ParameterizedMessage("[{}] failed send ping to {}", pingingRound.id(), node), e);
                }
            }

            @Override
            public void onRejection(Exception e) {
                // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings
                // But don't bail here, we can retry later on after the send ping has been scheduled.
                logger.debug("Ping execution rejected", e);
            }
        });
    }

    //......
}
复制代码
  • UnicastZenPing实现了ZenPing接口,它创建了unicastZenPingExecutorService线程池,同时维护了一个PingResponse的queue以及activePingingRounds
  • ping方法主要是异步及调度执行pingSender,而其doRun方法执行的是sendPings方法;sendPings方法构建UnicastPingRequest,对pingingRound中的seedAddresses挨个执行sendPingRequestToNode方法
  • sendPingRequestToNode向线程池提交AbstractRunnable,其doRun方法主要是使用transportService.sendRequest发送pingRequest

PingingRound

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    protected class PingingRound implements Releasable {
        private final int id;
        private final Map<TransportAddress, Connection> tempConnections = new HashMap<>();
        private final KeyedLock<TransportAddress> connectionLock = new KeyedLock<>(true);
        private final PingCollection pingCollection;
        private final List<TransportAddress> seedAddresses;
        private final Consumer<PingCollection> pingListener;
        private final DiscoveryNode localNode;
        private final ConnectionProfile connectionProfile;

        private AtomicBoolean closed = new AtomicBoolean(false);

        PingingRound(int id, List<TransportAddress> seedAddresses, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,
                     ConnectionProfile connectionProfile) {
            this.id = id;
            this.seedAddresses = Collections.unmodifiableList(seedAddresses.stream().distinct().collect(Collectors.toList()));
            this.pingListener = resultsConsumer;
            this.localNode = localNode;
            this.connectionProfile = connectionProfile;
            this.pingCollection = new PingCollection();
        }

        public int id() {
            return this.id;
        }

        public boolean isClosed() {
            return this.closed.get();
        }

        public List<TransportAddress> getSeedAddresses() {
            ensureOpen();
            return seedAddresses;
        }

        public Connection getOrConnect(DiscoveryNode node) throws IOException {
            Connection result;
            try (Releasable ignore = connectionLock.acquire(node.getAddress())) {
                result = tempConnections.get(node.getAddress());
                if (result == null) {
                    ensureOpen();
                    boolean success = false;
                    logger.trace("[{}] opening connection to [{}]", id(), node);
                    result = transportService.openConnection(node, connectionProfile);
                    try {
                        transportService.handshake(result, connectionProfile.getHandshakeTimeout().millis());
                        synchronized (this) {
                            // acquire lock and check if closed, to prevent leaving an open connection after closing
                            ensureOpen();
                            Connection existing = tempConnections.put(node.getAddress(), result);
                            assert existing == null;
                            success = true;
                        }
                    } finally {
                        if (success == false) {
                            logger.trace("[{}] closing connection to [{}] due to failure", id(), node);
                            IOUtils.closeWhileHandlingException(result);
                        }
                    }
                }
            }
            return result;
        }

        private void ensureOpen() {
            if (isClosed()) {
                throw new AlreadyClosedException("pinging round [" + id + "] is finished");
            }
        }

        public void addPingResponseToCollection(PingResponse pingResponse) {
            if (localNode.equals(pingResponse.node()) == false) {
                pingCollection.addPing(pingResponse);
            }
        }

        @Override
        public void close() {
            List<Connection> toClose = null;
            synchronized (this) {
                if (closed.compareAndSet(false, true)) {
                    activePingingRounds.remove(id);
                    toClose = new ArrayList<>(tempConnections.values());
                    tempConnections.clear();
                }
            }
            if (toClose != null) {
                // we actually closed
                try {
                    pingListener.accept(pingCollection);
                } finally {
                    IOUtils.closeWhileHandlingException(toClose);
                }
            }
        }

        public ConnectionProfile getConnectionProfile() {
            return connectionProfile;
        }
    }
复制代码
  • PingingRound提供了getOrConnect方法,来获取或创建一个discoveryNode的Connection,主要是调用transportService.openConnection方法并执行transportService.handshake

UnicastPingRequest

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    public static class UnicastPingRequest extends TransportRequest {

        public final int id;
        public final TimeValue timeout;
        public final PingResponse pingResponse;

        public UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) {
            this.id = id;
            this.timeout = timeout;
            this.pingResponse = pingResponse;
        }

        public UnicastPingRequest(StreamInput in) throws IOException {
            super(in);
            id = in.readInt();
            timeout = in.readTimeValue();
            pingResponse = new PingResponse(in);
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeInt(id);
            out.writeTimeValue(timeout);
            pingResponse.writeTo(out);
        }
    }
复制代码
  • UnicastPingRequest继承了TransportRequest,其writeTo方法,写入id、timeout、pingResponse

UnicastPingResponse

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    public static class UnicastPingResponse extends TransportResponse {

        final int id;

        public final PingResponse[] pingResponses;

        public UnicastPingResponse(int id, PingResponse[] pingResponses) {
            this.id = id;
            this.pingResponses = pingResponses;
        }

        public UnicastPingResponse(StreamInput in) throws IOException {
            id = in.readInt();
            pingResponses = new PingResponse[in.readVInt()];
            for (int i = 0; i < pingResponses.length; i++) {
                pingResponses[i] = new PingResponse(in);
            }
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeInt(id);
            out.writeVInt(pingResponses.length);
            for (PingResponse pingResponse : pingResponses) {
                pingResponse.writeTo(out);
            }
        }
    }
复制代码
  • UnicastPingResponse继承了TransportResponse,其writeTo写入id、pingResponses.length及pingResponses

UnicastPingRequestHandler

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    class UnicastPingRequestHandler implements TransportRequestHandler<UnicastPingRequest> {

        @Override
        public void messageReceived(UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
            if (closed) {
                throw new AlreadyClosedException("node is shutting down");
            }
            if (request.pingResponse.clusterName().equals(clusterName)) {
                channel.sendResponse(handlePingRequest(request));
            } else {
                throw new IllegalStateException(
                    String.format(
                        Locale.ROOT,
                        "mismatched cluster names; request: [%s], local: [%s]",
                        request.pingResponse.clusterName().value(),
                        clusterName.value()));
            }
        }

    }

    private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {
        assert clusterName.equals(request.pingResponse.clusterName()) :
            "got a ping request from a different cluster. expected " + clusterName + " got " + request.pingResponse.clusterName();
        temporalResponses.add(request.pingResponse);
        // add to any ongoing pinging
        activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse));
        threadPool.schedule(() -> temporalResponses.remove(request.pingResponse),
            TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME);

        List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
        pingResponses.add(createPingResponse(contextProvider.clusterState()));

        return new UnicastPingResponse(request.id, pingResponses.toArray(new PingResponse[pingResponses.size()]));
    }

复制代码
  • UnicastPingRequestHandler继承了TransportRequestHandler,其messageReceived方法主要是调用handlePingRequest方法并返回结果;handlePingRequest方法主要是将使用pingRound的addPingResponseToCollection添加request.pingResponse,同事注册一个timeout任务执行temporalResponses.remove(request.pingResponse);最后创建UnicastPingResponse返回

getPingResponseHandler

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(final PingingRound pingingRound,
                                                                                   final DiscoveryNode node) {
        return new TransportResponseHandler<UnicastPingResponse>() {

            @Override
            public UnicastPingResponse read(StreamInput in) throws IOException {
                return new UnicastPingResponse(in);
            }

            @Override
            public String executor() {
                return ThreadPool.Names.SAME;
            }

            @Override
            public void handleResponse(UnicastPingResponse response) {
                logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses));
                if (pingingRound.isClosed()) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("[{}] skipping received response from {}. already closed", pingingRound.id(), node);
                    }
                } else {
                    Stream.of(response.pingResponses).forEach(pingingRound::addPingResponseToCollection);
                }
            }

            @Override
            public void handleException(TransportException exp) {
                if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException ||
                    exp.getCause() instanceof AlreadyClosedException) {
                    // ok, not connected...
                    logger.trace(() -> new ParameterizedMessage("failed to connect to {}", node), exp);
                } else if (closed == false) {
                    logger.warn(() -> new ParameterizedMessage("failed to send ping to [{}]", node), exp);
                }
            }
        };
    }
复制代码
  • getPingResponseHandler方法创建了匿名的TransportResponseHandler,用于处理UnicastPingResponse;其handleResponse方法执行的是pingingRound.addPingResponseToCollection方法

小结

  • ZenPing接口继承了Releasable接口,另外它还定义了start、ping方法;除此之外还定义了PingResponse、PingCollection这两个类
  • UnicastZenPing实现了ZenPing接口,它创建了unicastZenPingExecutorService线程池,同时维护了一个PingResponse的queue以及activePingingRounds
  • ping方法主要是异步及调度执行pingSender,而其doRun方法执行的是sendPings方法;sendPings方法构建UnicastPingRequest,对pingingRound中的seedAddresses挨个执行sendPingRequestToNode方法;sendPingRequestToNode向线程池提交AbstractRunnable,其doRun方法主要是使用transportService.sendRequest发送pingRequest

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/32226
 
172 次点击