1 Node的启动流程
节点的启动整体流程如下所示:
2 启动过程分析
启动入口:
通过启动脚本bin/elasticsearch启动ES,在distribution模块中的/src/bin目录下面,脚本先载入了jvm配置文件jvm.options。最后启动org.elasticsearch.bootstrap.Elasticsearch这个主类中的main方法。
exec \ #执行命令
"$JAVA" \ #Java程序路径
$ES_JAVA_OPTS \ #JVM选项
- Des. path. home= "$ES_HOME" \ #设置path. home路径
- Des. path. conf= "$ES_PATH_CONF" \ #设置path. conf路径
- Des. distribution. flavor= "$ES_DISTRIBUTION_FLAVOR" \
- Des. distribution. type= "$ES_DISTRIBUTION_TYPE" \
- cp "$ES_CLASSPATH" \ #设置java classpath
org. elasticsearch. bootstrap. Elasticsearch \ #指定main函数所在类
"$@" #传递给main函数命令行参数
main方法中首先添加了shutdown Hook,以便在close的时候清理关闭资源:
在JVM里面添加关闭的shutdown Hook,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的shutdown Hook,当系统执行完这些shutdown Hook后,jvm才会关闭。
所以这些shutdown Hook可以在jvm关闭的时候进行内存清理、对象销毁等操作。
Runtime. getRuntime ( ) . addShutdownHook ( shutdownHookThread) ;
然后检查了ES的3个环境参数:
putSystemPropertyIfSettingIsMissing ( settings, "path.data" , "es.path.data" ) ;
putSystemPropertyIfSettingIsMissing ( settings, "path.home" , "es.path.home" ) ;
putSystemPropertyIfSettingIsMissing ( settings, "path.logs" , "es.path.logs" ) ;
所有的检查做完后,代码进入到Bootstrap#init方法,Bootstrap类完成了elasticsearch的启动。
先初始化一个Bootstrap实例;
加载安全配置;
配置log输出器;
创建pid文件,会在磁盘上持久化一个记录应用pid的文件;
调用Bootstrap的setup方法和start方法
setup方法主要:初始化环境配置,添加关闭shutdown Hook,检查jar包冲突,配置日志输出器,通过environment初始化Node实例。
start方法主要:启动已经实例化的Node,启动keepAliveThread 线程。
private void start ( ) throws NodeValidationException {
node. start ( ) ;
keepAliveThread. start ( ) ;
}
可以看到启动的重点在setup方法中,启动过后就是Node的事了。
Node类的实现
Node是通过NodeBuilder来实例化的,使用google的注入框架Guice的Injector进行注入与获取实例。elasticsearch里面的子模块都是用上面的方法进行模块化管理,elasticsearch对guice进行了封装,通过ModulesBuilder类构建elasticsearch的模块:
ModulesBuilder modules = new ModulesBuilder ( ) ;
for ( Module pluginModule : pluginsService. createGuiceModules ( ) ) {
modules. add ( pluginModule) ;
}
Node的启动就是Node里每个子模块的启动,同样的,分别调用不同的的start方法来启动这个组件,如下所示,通过命令也能很清晰的看出来各个模块的功能,子模块的start方法基本就是初始化内部数据、创建线程池、启动线程池等操作。
详细的子模块这里不介绍,后续会对各个主要功能模块进行分析。
public Node start ( ) throws NodeValidationException {
. . . . . .
injector.
getInstance ( MappingUpdatedAction. class ) . setClient ( client) ;
injector. getInstance ( IndicesService. class ) . start ( ) ;
injector. getInstance ( IndicesClusterStateService. class ) . start ( ) ;
injector. getInstance ( SnapshotsService. class ) . start ( ) ;
injector. getInstance ( SnapshotShardsService. class ) . start ( ) ;
injector. getInstance ( RoutingService. class ) . start ( ) ;
injector. getInstance ( SearchService. class ) . start ( ) ;
nodeService. getMonitorService ( ) . start ( ) ;
final ClusterService clusterService = injector. getInstance ( ClusterService. class ) ;
final NodeConnectionsService nodeConnectionsService = injector. getInstance ( NodeConnectionsService. class ) ;
nodeConnectionsService. start ( ) ;
clusterService. setNodeConnectionsService ( nodeConnectionsService) ;
injector. getInstance ( ResourceWatcherService. class ) . start ( ) ;
injector. getInstance ( GatewayService. class ) . start ( ) ;
Discovery discovery = injector. getInstance ( Discovery. class ) ;
clusterService. getMasterService ( ) . setClusterStatePublisher ( discovery: : publish) ;
TransportService transportService = injector. getInstance ( TransportService. class ) ; transportService. getTaskManager ( ) . setTaskResultsService ( injector. getInstance ( TaskResultsService. class ) ) ;
transportService. start ( ) ;
. . . . . .
discovery. start ( ) ;
clusterService. start ( ) ;
transportService. acceptIncomingRequests ( ) ;
discovery. startInitialJoin ( ) ;
. . . . . .
if ( NetworkModule. HTTP_ENABLED. get
( settings) ) {
injector. getInstance ( HttpServerTransport. class ) . start ( ) ;
}
. . . . . .
if ( WRITE_PORTS_FILE_SETTING. get ( settings) ) {
if ( NetworkModule. HTTP_ENABLED. get ( settings) ) {
HttpServerTransport http = injector. getInstance ( HttpServerTransport. class ) ;
writePortsFile ( "http" , http. boundAddress ( ) ) ;
}
TransportService transport = injector. getInstance ( TransportService. class ) ;
writePortsFile ( "transport" , transport. boundAddress ( ) ) ;
}
. . . . . .
pluginsService. filterPlugins ( ClusterPlugin. class ) . forEach ( ClusterPlugin: : onNodeStarted) ;
return this ;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
Node启动之后,调用keepAliveThread.start()方法启动keepAlive线程,线程本身不做具体工作。主线程执行完启动流程之后会退出,keepAlive线程是唯一的用户线程,作用是保持进程运行。
java程序中,至少要有一个用户线程。当用户线程数为0时退出进程。
3 关闭过程分析
在节点启动的过程中,Bootstrap#setup方法里面添加了shutdown Hook,当进行收到系统的kill命令或者SIGINT信号时,调用Node#close方法,执行节点关闭流程。
每个模块的Service都实现了doStop和doClose,用于处理模块的关闭流程。先调用一遍各个模块的doStop,然后遍历调用各个模块执行doClose。
if ( lifecycle. started ( ) ) {
stop ( ) ;
}
List< Closeable> toClose = new ArrayList < > ( ) ;
toClose. add ( nodeService) ;
. . . . .
IOUtils. close ( toClose) ;
各模块的关闭有一定的顺序:
ResourceWatcherService:资源监视服务;
HttpServerTransport:HTTP传输服务,不再响应Rest请求;
SnapshotsService:快照服务;
SnapshotShardsService:shard级快照服务;
IndicesClusterStateService:索引相关操作;
Discovery:集群拓扑管理,不再响应ping请求;
RoutingService:路由服务;
ClusterService:集群管理服务;
NodeConnectionsService:节点连接管理服务;
MonitorService:进程级监控服务;
GatewayService:集群元数据持久化和恢复;
SearchService:处理查询请求;
TransportService:底层传输服务;
plugins:当前所有插件;
IndicesService:创建、删除索引等操作。
最后才关闭IndicesService,因为这个期间需要等待释放的资源最多,时间最长。
4 小结
总的来说,启动过程比较简单,就是初始化和校验工作,启动各个子模块后异步的进行各自的工作;关闭的过程也会按照子模块服务的顺序来关闭。