Py学习  »  Elasticsearch

【Elasticsearch源码】节点的启动和关闭流程分析

少加点香菜 • 4 年前 • 153 次点击  

1 Node的启动流程

节点的启动整体流程如下所示:
在这里插入图片描述

2 启动过程分析

启动入口: 通过启动脚本bin/elasticsearch启动ES,在distribution模块中的/src/bin目录下面,脚本先载入了jvm配置文件jvm.options。最后启动org.elasticsearch.bootstrap.Elasticsearch这个主类中的main方法。

  exec \     #执行命令
    "$JAVA" \   #Java程序路径
    $ES_JAVA_OPTS \   #JVM选项
    -Des.path.home="$ES_HOME" \   #设置path.home路径
    -Des.path.conf="$ES_PATH_CONF" \  #设置path.conf路径
    -Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR" \  
    -Des.distribution.type="$ES_DISTRIBUTION_TYPE" \
    -cp "$ES_CLASSPATH" \  #设置java classpath
    org.elasticsearch.bootstrap.Elasticsearch \   #指定main函数所在类
    "$@"    #传递给main函数命令行参数
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

main方法中首先添加了shutdown Hook,以便在close的时候清理关闭资源:

在JVM里面添加关闭的shutdown Hook,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的shutdown Hook,当系统执行完这些shutdown Hook后,jvm才会关闭。
所以这些shutdown Hook可以在jvm关闭的时候进行内存清理、对象销毁等操作。

Runtime.getRuntime().addShutdownHook(shutdownHookThread);
  • 1

然后检查了ES的3个环境参数:

putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
  • 1
  • 2
  • 3

所有的检查做完后,代码进入到Bootstrap#init方法,Bootstrap类完成了elasticsearch的启动。

  1. 先初始化一个Bootstrap实例;
  2. 加载安全配置;
  3. 配置log输出器;
  4. 创建pid文件,会在磁盘上持久化一个记录应用pid的文件;
  5. 调用Bootstrap的setup方法和start方法

setup方法主要:初始化环境配置,添加关闭shutdown Hook,检查jar包冲突,配置日志输出器,通过environment初始化Node实例。

start方法主要:启动已经实例化的Node,启动keepAliveThread 线程。

    private void start() throws NodeValidationException {
        node.start();
        keepAliveThread.start();
    }
  • 1
  • 2
  • 3
  • 4

可以看到启动的重点在setup方法中,启动过后就是Node的事了。

Node类的实现

Node是通过NodeBuilder来实例化的,使用google的注入框架Guice的Injector进行注入与获取实例。elasticsearch里面的子模块都是用上面的方法进行模块化管理,elasticsearch对guice进行了封装,通过ModulesBuilder类构建elasticsearch的模块:

ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
     modules.add(pluginModule);
}
  • 1
  • 2
  • 3
  • 4
  • 5

Node的启动就是Node里每个子模块的启动,同样的,分别调用不同的的start方法来启动这个组件,如下所示,通过命令也能很清晰的看出来各个模块的功能,子模块的start方法基本就是初始化内部数据、创建线程池、启动线程池等操作。

详细的子模块这里不介绍,后续会对各个主要功能模块进行分析。

    public Node start() throws NodeValidationException {
        ......
        injector.


    
getInstance(MappingUpdatedAction.class).setClient(client);
        injector.getInstance(IndicesService.class).start();
        injector.getInstance(IndicesClusterStateService.class).start();
        injector.getInstance(SnapshotsService.class).start();
        injector.getInstance(SnapshotShardsService.class).start();
        injector.getInstance(RoutingService.class).start();
        injector.getInstance(SearchService.class).start();
        nodeService.getMonitorService().start();
        final ClusterService clusterService = injector.getInstance(ClusterService.class);
        final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
        nodeConnectionsService.start();
        clusterService.setNodeConnectionsService(nodeConnectionsService);
        injector.getInstance(ResourceWatcherService.class).start();
        injector.getInstance(GatewayService.class).start();
        Discovery discovery = injector.getInstance(Discovery.class);
        clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
        TransportService transportService = injector.getInstance(TransportService.class);   transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.start();
        ......
        discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
        clusterService.start();
        transportService.acceptIncomingRequests();
        discovery.startInitialJoin();
        ......
        if (NetworkModule.HTTP_ENABLED.get


    
(settings)) {
            injector.getInstance(HttpServerTransport.class).start();
        }
		......
        if (WRITE_PORTS_FILE_SETTING.get(settings)) {
            if (NetworkModule.HTTP_ENABLED.get(settings)) {
                HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
                writePortsFile("http", http.boundAddress());
            }
            TransportService transport = injector.getInstance(TransportService.class);
            writePortsFile("transport", transport.boundAddress());
        }
		......
        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
        return this;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

Node启动之后,调用keepAliveThread.start()方法启动keepAlive线程,线程本身不做具体工作。主线程执行完启动流程之后会退出,keepAlive线程是唯一的用户线程,作用是保持进程运行。

java程序中,至少要有一个用户线程。当用户线程数为0时退出进程。

3 关闭过程分析

在节点启动的过程中,Bootstrap#setup方法里面添加了shutdown Hook,当进行收到系统的kill命令或者SIGINT信号时,调用Node#close方法,执行节点关闭流程。

每个模块的Service都实现了doStop和doClose,用于处理模块的关闭流程。先调用一遍各个模块的doStop,然后遍历调用各个模块执行doClose。

if (lifecycle.started()) {
     stop(); // 调用各个子模块的doStop方法
}

List<Closeable> toClose = new ArrayList<>();
// 在toClose方法中添加所有需要关闭的Service
toClose.add(nodeService);  
.....

// 调用各模块的doClose方法
IOUtils.close(toClose); 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

各模块的关闭有一定的顺序:

  1. ResourceWatcherService:资源监视服务;
  2. HttpServerTransport:HTTP传输服务,不再响应Rest请求;
  3. SnapshotsService:快照服务;
  4. SnapshotShardsService:shard级快照服务;
  5. IndicesClusterStateService:索引相关操作;
  6. Discovery:集群拓扑管理,不再响应ping请求;
  7. RoutingService:路由服务;
  8. ClusterService:集群管理服务;
  9. NodeConnectionsService:节点连接管理服务;
  10. MonitorService:进程级监控服务;
  11. GatewayService:集群元数据持久化和恢复;
  12. SearchService:处理查询请求;
  13. TransportService:底层传输服务;
  14. plugins:当前所有插件;
  15. IndicesService:创建、删除索引等操作。

最后才关闭IndicesService,因为这个期间需要等待释放的资源最多,时间最长。

4 小结

总的来说,启动过程比较简单,就是初始化和校验工作,启动各个子模块后异步的进行各自的工作;关闭的过程也会按照子模块服务的顺序来关闭。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/53496
 
153 次点击