社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

聊聊Elasticsearch的NodesSniffer

go4it • 6 年前 • 391 次点击  
阅读 2

聊聊Elasticsearch的NodesSniffer

本文主要研究一下Elasticsearch的NodesSniffer

NodesSniffer

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/NodesSniffer.java

/**
 * Responsible for sniffing the http hosts
 */
public interface NodesSniffer {
    /**
     * Returns the sniffed Elasticsearch nodes.
     */
    List<Node> sniff() throws IOException;
}
复制代码
  • NodesSniffer接口定义了sniff方法用于获取sniffed Elasticsearch nodes,它有一个实现类为ElasticsearchNodesSniffer

ElasticsearchNodesSniffer

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java

public final class ElasticsearchNodesSniffer implements NodesSniffer {

    private static final Log logger = LogFactory.getLog(ElasticsearchNodesSniffer.class);

    public static final long DEFAULT_SNIFF_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(1);

    private final RestClient restClient;
    private final Request request;
    private final Scheme scheme;
    private final JsonFactory jsonFactory = new JsonFactory();

    public ElasticsearchNodesSniffer(RestClient restClient) {
        this(restClient, DEFAULT_SNIFF_REQUEST_TIMEOUT, ElasticsearchNodesSniffer.Scheme.HTTP);
    }

    public ElasticsearchNodesSniffer(RestClient restClient, long sniffRequestTimeoutMillis, Scheme scheme) {
        this.restClient = Objects.requireNonNull(restClient, "restClient cannot be null");
        if (sniffRequestTimeoutMillis < 0) {
            throw new IllegalArgumentException("sniffRequestTimeoutMillis must be greater than 0");
        }
        this.request = new Request("GET", "/_nodes/http");
        request.addParameter("timeout", sniffRequestTimeoutMillis + "ms");
        this.scheme = Objects.requireNonNull(scheme, "scheme cannot be null");
    }

    /**
     * Calls the elasticsearch nodes info api, parses the response and returns all the found http hosts
     */
    @Override
    public List<Node> sniff() throws IOException {
        Response response = restClient.performRequest(request);
        return readHosts(response.getEntity(), scheme, jsonFactory);
    }

    static List<Node> readHosts(HttpEntity entity, Scheme scheme, JsonFactory jsonFactory) throws IOException {
        try (InputStream inputStream = entity.getContent()) {
            JsonParser parser = jsonFactory.createParser(inputStream);
            if (parser.nextToken() != JsonToken.START_OBJECT) {
                throw new IOException("expected data to start with an object");
            }
            List<Node> nodes = new ArrayList<>();
            while (parser.nextToken() != JsonToken.END_OBJECT) {
                if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
                    if ("nodes".equals(parser.getCurrentName())) {
                        while (parser.nextToken() != JsonToken.END_OBJECT) {
                            JsonToken token = parser.nextToken();
                            assert token == JsonToken.START_OBJECT;
                            String nodeId = parser.getCurrentName();
                            Node node = readNode(nodeId, parser, scheme);
                            if (node != null) {
                                nodes.add(node);
                            }
                        }
                    } else {
                        parser.skipChildren();
                    }
                }
            }
            return nodes;
        }
    }

    //......

    public enum Scheme {
        HTTP("http"), HTTPS("https");

        private final String name;

        Scheme(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return name;
        }
    }

}
复制代码
  • ElasticsearchNodesSniffer的构造器需要restClient、sniffRequestTimeoutMillis、scheme三个参数,其中sniffRequestTimeoutMillis默认为1秒,scheme默认为HTTP;它的构造器创建了GET /_nodes/http的request;sniff方法使用restClient.performRequest来执行这个GET /_nodes/http的request,之后调用readHosts来解析response,其中调用了readNode方法来解析nodes部分

GET /_nodes/http实例

{
  "_nodes" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "cluster_name" : "docker-cluster",
  "nodes" : {
    "d7w2wdw7Q7SqERe5_fxZYA" : {
      "name" : "d7w2wdw",
      "transport_address" : "172.17.0.2:9300",
      "host" : "172.17.0.2",
      "ip" : "172.17.0.2",
      "version" : "6.6.2",
      "build_flavor" : "oss",
      "build_type" : "tar",
      "build_hash" : "3bd3e59",
      "roles" : [
        "master",
        "data",
        "ingest"
      ],
      "http" : {
        "bound_address" : [
          "0.0.0.0:9200"
        ],
        "publish_address" : "192.168.99.100:9200",
        "max_content_length_in_bytes" : 104857600
      }
    }
  }
}
复制代码
  • 这里http部分有publish_address信息

readNode

elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/ElasticsearchNodesSniffer.java

public final class ElasticsearchNodesSniffer implements NodesSniffer {
	//......

    private static Node readNode(String nodeId, JsonParser parser, Scheme scheme) throws IOException {
        HttpHost publishedHost = null;
        /*
         * We sniff the bound hosts so we can look up the node based on any
         * address on which it is listening. This is useful in


    
 Elasticsearch's
         * test framework where we sometimes publish ipv6 addresses but the
         * tests contact the node on ipv4.
         */
        Set<HttpHost> boundHosts = new HashSet<>();
        String name = null;
        String version = null;
        /*
         * Multi-valued attributes come with key = `real_key.index` and we
         * unflip them after reading them because we can't rely on the order
         * that they arive.
         */
        final Map<String, String> protoAttributes = new HashMap<String, String>();

        boolean sawRoles = false;
        boolean master = false;
        boolean data = false;
        boolean ingest = false;

        String fieldName = null;
        while (parser.nextToken() != JsonToken.END_OBJECT) {
            if (parser.getCurrentToken() == JsonToken.FIELD_NAME) {
                fieldName = parser.getCurrentName();
            } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
                if ("http".equals(fieldName)) {
                    while (parser.nextToken() != JsonToken.END_OBJECT) {
                        if (parser.getCurrentToken() == JsonToken.VALUE_STRING && "publish_address".equals(parser.getCurrentName())) {
                            URI publishAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());
                            publishedHost = new HttpHost(publishAddressAsURI.getHost(), publishAddressAsURI.getPort(),
                                    publishAddressAsURI.getScheme());
                        } else if (parser.currentToken() == JsonToken.START_ARRAY && "bound_address".equals(parser.getCurrentName())) {
                            while (parser.nextToken() != JsonToken.END_ARRAY) {
                                URI boundAddressAsURI = URI.create(scheme + "://" + parser.getValueAsString());
                                boundHosts.add(new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(),
                                        boundAddressAsURI.getScheme()));
                            }
                        } else if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
                            parser.skipChildren();
                        }
                    }
                } else if ("attributes".equals(fieldName)) {
                    while (parser.nextToken() != JsonToken.END_OBJECT) {
                        if (parser.getCurrentToken() == JsonToken.VALUE_STRING) {
                            String oldValue = protoAttributes.put(parser.getCurrentName(), parser.getValueAsString());
                            if (oldValue != null) {
                                throw new IOException("repeated attribute key [" + parser.getCurrentName() + "]");
                            }
                        } else {
                            parser.skipChildren();
                        }
                    }
                } else {
                    parser.skipChildren();
                }
            } else if (parser.currentToken() == JsonToken.START_ARRAY) {
                if ("roles".equals(fieldName)) {
                    sawRoles = true;
                    while (parser.nextToken() != JsonToken.END_ARRAY) {
                        switch (parser.getText()) {
                        case "master":
                            master = true;
                            break;
                        case "data":
                            data = true;
                            break;
                        case "ingest":
                            ingest = true;
                            break;
                        default:
                            logger.warn("unknown role [" + parser.getText() + "] on node [" + nodeId + "]");
                        }
                    }
                } else {
                    parser.skipChildren();
                }
            } else if (parser.currentToken().isScalarValue()) {
                if ("version".equals(fieldName)) {
                    version = parser.getText();
                } else if ("name".equals(fieldName)) {
                    name = parser.getText();
                }
            }
        }
        //http section is not present if http is not enabled on the node, ignore such nodes
        if (publishedHost == null) {
            logger.debug("skipping node [" + nodeId + "] with http disabled");
            return null;
        }

        Map<String, List<String>> realAttributes = new HashMap<>(protoAttributes.size());
        List<String> keys = new ArrayList<>(protoAttributes.keySet());
        for (String key : keys) {
            if (key.endsWith(".0")) {
                String realKey = key.substring(0, key.length() - 2);
                List<String> values = new ArrayList<>();
                int i = 0;
                while (true) {
                    String value = protoAttributes.remove(realKey + "." + i);
                    if (value == null) {
                        break;
                    }
                    values.add(value);
                    i++;
                }
                realAttributes.put(realKey, unmodifiableList(values));
            }
        }
        for (Map.Entry<String, String> entry : protoAttributes.entrySet()) {
            realAttributes.put(entry.getKey(), singletonList(entry.getValue()));
        }

        if (version.startsWith("2.")) {
            /*
             * 2.x doesn't send roles, instead we try to read them from
             * attributes.
             */
            boolean clientAttribute = v2RoleAttributeValue(realAttributes, "client", false);
            Boolean masterAttribute = v2RoleAttributeValue(realAttributes, "master", null);
            Boolean dataAttribute = v2RoleAttributeValue(realAttributes, "data", null);
            master = masterAttribute == null ? false == clientAttribute : masterAttribute;
            data = dataAttribute == null ? false == clientAttribute : dataAttribute;
        } else {
            assert sawRoles : "didn't see roles for [" + nodeId + "]";
        }
        assert boundHosts.contains(publishedHost) :
                "[" + nodeId + "] doesn't make sense! publishedHost should be in boundHosts";
        logger.trace("adding node [" + nodeId + "]");
        return new Node(publishedHost, boundHosts, name, version, new Roles(master, data, ingest),
                unmodifiableMap(realAttributes));
    }

    /**
     * Returns {@code defaultValue} if the attribute didn't come back,
     * {@code true} or {@code false} if it did come back as
     * either of those, or throws an IOException if the attribute
     * came back in


    
 a strange way.
     */
    private static Boolean v2RoleAttributeValue(Map<String, List<String>> attributes,
            String name, Boolean defaultValue) throws IOException {
        List<String> valueList = attributes.remove(name);
        if (valueList == null) {
            return defaultValue;
        }
        if (valueList.size() != 1) {
            throw new IOException("expected only a single attribute value for [" + name + "] but got "
                    + valueList);
        }
        switch (valueList.get(0)) {
        case "true":
            return true;
        case "false":
            return false;
        default:
            throw new IOException("expected [" + name + "] to be either [true] or [false] but was ["
                    + valueList.get(0) + "]");
        }
    }

	//......
}
复制代码
  • readNode方法用于解析nodes部分的数据,它会解析http、attributes、roles、version;然后会对2.x版本的进行特殊处理,最后使用publishedHost、boundHosts、name、version、master、data、ingest、realAttributes构建Node实例并返回

小结

  • NodesSniffer接口定义了sniff方法用于获取sniffed Elasticsearch nodes,它有一个实现类为ElasticsearchNodesSniffer
  • ElasticsearchNodesSniffer的构造器需要restClient、sniffRequestTimeoutMillis、scheme三个参数,其中sniffRequestTimeoutMillis默认为1秒,scheme默认为HTTP;它的构造器创建了GET /_nodes/http的request;sniff方法使用restClient.performRequest来执行这个GET /_nodes/http的request,之后调用readHosts来解析response,其中调用了readNode方法来解析nodes部分
  • readNode方法用于解析nodes部分的数据,它会解析http、attributes、roles、version;然后会对2.x版本的进行特殊处理,最后使用publishedHost、boundHosts、name、version、master、data、ingest、realAttributes构建Node实例并返回

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/33240
 
391 次点击