社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

聊聊elasticsearch的SeedHostsProvider

go4it • 6 年前 • 263 次点击  
阅读 3

聊聊elasticsearch的SeedHostsProvider

本文主要研究一下elasticsearch的SeedHostsProvider

SeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsProvider.java

/**
 * A pluggable provider of the list of seed hosts to use for discovery.
 */
public interface SeedHostsProvider {

    /**
     * Returns a list of seed hosts to use for discovery. Called repeatedly while discovery is active (i.e. while there is no master)
     * so that this list may be dynamic.
     */
    List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver);

    /**
     * Helper object that allows to resolve a list of hosts to a list of transport addresses.
     * Each host is resolved into a transport address (or a collection of addresses if the
     * number of ports is greater than one)
     */
    interface HostsResolver {
        List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts);
    }
}
复制代码
  • SeedHostsProvider接口定义了getSeedAddresses方法,该方法参数类型为HostsResolver;HostsResolver接口定义了resolveHosts方法;它有几个实现类,分别是SettingsBasedSeedHostsProvider、FileBasedSeedHostsProvider、GceSeedHostsProvider、AwsEc2SeedHostsProvider、AzureSeedHostsProvider

SettingsBasedSeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SettingsBasedSeedHostsProvider.java

public class SettingsBasedSeedHostsProvider implements SeedHostsProvider {

    private static final Logger logger = LogManager.getLogger(SettingsBasedSeedHostsProvider.class);

    public static final Setting<List<String>> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
        Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Property.NodeScope, Property.Deprecated);

    public static final Setting<List<String>> DISCOVERY_SEED_HOSTS_SETTING =
        Setting.listSetting("discovery.seed_hosts", emptyList(), Function.identity(), Property.NodeScope);

    // these limits are per-address
    private static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
    private static final int LIMIT_LOCAL_PORTS_COUNT = 5;

    private final List<String> configuredHosts;
    private final int limitPortCounts;

    public SettingsBasedSeedHostsProvider(Settings settings, TransportService transportService) {
        if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
            if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) {
                throw new IllegalArgumentException("it is forbidden to set both ["
                    + DISCOVERY_SEED_HOSTS_SETTING.getKey() + "] and ["
                    + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey() + "]");
            }
            configuredHosts = LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
            // we only limit to 1 address, makes no sense to ping 100 ports
            limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
        } else if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) {
            configuredHosts = DISCOVERY_SEED_HOSTS_SETTING.get(settings);
            // we only limit to 1 address, makes no sense to ping 100 ports
            limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
        } else {
            // if unicast hosts are not specified, fill with simple defaults on the local machine
            configuredHosts = transportService.getLocalAddresses();
            limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
        }

        logger.debug("using initial hosts {}", configuredHosts);
    }

    @Override
    public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
        return hostsResolver.resolveHosts(configuredHosts, limitPortCounts);
    }
}
复制代码
  • SettingsBasedSeedHostsProvider主要是读取discovery.seed_hosts或discovery.zen.ping.unicast.hosts配置

FileBasedSeedHostsProvider

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/FileBasedSeedHostsProvider.java

public class FileBasedSeedHostsProvider implements SeedHostsProvider {

    private static final Logger logger = LogManager.getLogger(FileBasedSeedHostsProvider.class);

    public static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";

    private final Path unicastHostsFilePath;

    public FileBasedSeedHostsProvider(Path configFile) {
        this.unicastHostsFilePath = configFile.resolve(UNICAST_HOSTS_FILE);
    }

    private List<String> getHostsList() {
        if (Files.exists(unicastHostsFilePath)) {
            try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
                return lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
                    .collect(Collectors.toList());
            } catch (IOException e) {
                logger.warn(() -> new ParameterizedMessage("failed to read file [{}]", unicastHostsFilePath), e);
                return Collections.emptyList();
            }
        }

        logger.warn("expected, but did not find, a dynamic hosts list at [{}]", unicastHostsFilePath);

        return Collections.emptyList();
    }

    @Override
    public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
        final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList(), 1);
        logger.debug("seed addresses: {}", transportAddresses);
        return transportAddresses;
    }
}
复制代码
  • FileBasedSeedHostsProvider主要是从指定位置读取unicast_hosts.txt文件解析hostsList

SeedHostsProvider.HostsResolver

UnicastZenPing.createHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

    private SeedHostsProvider.HostsResolver createHostsResolver() {
        return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
            


    
limitPortCounts, transportService, resolveTimeout);
    }
复制代码
  • UnicastZenPing的createHostsResolver方法创建了一个匿名的SeedHostsProvider.HostsResolver类,其实现是委托为SeedHostsResolver.resolveHostsLists方法

SeedHostsResolver

elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java

public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {

	//......

    public static List<TransportAddress> resolveHostsLists(
        final ExecutorService executorService,
        final Logger logger,
        final List<String> hosts,
        final int limitPortCounts,
        final TransportService transportService,
        final TimeValue resolveTimeout) {
        Objects.requireNonNull(executorService);
        Objects.requireNonNull(logger);
        Objects.requireNonNull(hosts);
        Objects.requireNonNull(transportService);
        Objects.requireNonNull(resolveTimeout);
        if (resolveTimeout.nanos() < 0) {
            throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
        }
        // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
        final List<Callable<TransportAddress[]>> callables =
            hosts
                .stream()
                .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
                .collect(Collectors.toList());
        final List<Future<TransportAddress[]>> futures;
        try {
            futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return Collections.emptyList();
        }
        final List<TransportAddress> transportAddresses = new ArrayList<>();
        final Set<TransportAddress> localAddresses = new HashSet<>();
        localAddresses.add(transportService.boundAddress().publishAddress());
        localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
        // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
        // hostname with the corresponding task by iterating together
        final Iterator<String> it = hosts.iterator();
        for (final Future<TransportAddress[]> future : futures) {
            final String hostname = it.next();
            if (!future.isCancelled()) {
                assert future.isDone();
                try {
                    final TransportAddress[] addresses = future.get();
                    logger.trace("resolved host [{}] to {}", hostname, addresses);
                    for (int addressId = 0; addressId < addresses.length; addressId++) {
                        final TransportAddress address = addresses[addressId];
                        // no point in pinging ourselves
                        if (localAddresses.contains(address) == false) {
                            transportAddresses.add(address);
                        }
                    }
                } catch (final ExecutionException e) {
                    assert e.getCause() != null;
                    final String message = "failed to resolve host [" + hostname + "]";
                    logger.warn(message, e.getCause());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    // ignore
                }
            } else {
                logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
            }
        }
        return Collections.unmodifiableList(transportAddresses);
    }

    //......
}
复制代码
  • SeedHostsResolver的resolveHostsLists静态方法主要是利用多线程并行执行transportService.addressesFromString方法

小结

  • SeedHostsProvider接口定义了getSeedAddresses方法,该方法参数类型为HostsResolver;HostsResolver接口定义了resolveHosts方法;它有几个实现类,分别是SettingsBasedSeedHostsProvider、FileBasedSeedHostsProvider、GceSeedHostsProvider、AwsEc2SeedHostsProvider、AzureSeedHostsProvider
  • SettingsBasedSeedHostsProvider主要是读取discovery.seed_hosts或discovery.zen.ping.unicast.hosts配置;FileBasedSeedHostsProvider主要是从指定位置读取unicast_hosts.txt文件解析hostsList
  • UnicastZenPing的createHostsResolver方法创建了一个匿名的SeedHostsProvider.HostsResolver类,其实现是委托为SeedHostsResolver.resolveHostsLists方法;SeedHostsResolver的resolveHostsLists静态方法主要是利用多线程并行执行transportService.addressesFromString方法

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/32199
 
263 次点击