社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

[10]elasticsearch源码深入分析——线程池的封装

飞来来 • 7 年前 • 635 次点击  

本篇为elasticsearch源码分析系列文章的第十篇,本篇延续上一篇ElasticSearch的Plugin引出的内容,进行各种Plugin中线程池的分析。

上篇讲到了ElasticSearch中插件的基本概念,以及Node实例化中涉及到的PluginService初始化编码,本篇将会继续研究Node实例化的过程中PluginsService发挥的作用,也就是通过PluginsService中的参数构建线程池框架。

线程池在何时初始化

当Node完成了PluginsService的构造后,紧接会通过getExecutorBuilders方法取得线程池的Executor构造器列表,代码如下:

List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings)

此时PluginsService对象中已经有了需要加载的所有plugin了,包含modules路径和plugins路径中的所有组件,这里统称为plugin。如下图所示总共是包含了13个已加载的Plugin,分别是modules路径中的默认必须加载的12个和Plugins路径中的自定义安装的1个(ICU分词器)。如下图所示

路径中的plugin对象

内存中的plugin对象

构建线程池框架

初始化ExecutorBuilder集合

Node实例化过程中,通过代码:

List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);

查找到自定义的线程池Executor构建器。再获得自定义线程池构建器集合后,开始构建线程池(ThreadPool)。

ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));

首先通过代码获得处理器CPU的数量,

Runtime.getRuntime().availableProcessors()

当然这个值是可以被Setting中设置的变量processors来覆盖的。这个变量在代码中被标记为availableProcessors。然后创建变量

  • halfProcMaxAt5,这个变量的意思是availableProcessors的一半,但最大不超过5。
  • halfProcMaxAt10,这个变量的意思是availableProcessors的一半,但最大不超过10。

这两个变量在后面创建各种线程池构造器中反复用到。

在确定了可使用的处理器数量后,就能确定线程池的最小值(genericThreadPoolMax),ElasticSearch中是确定为:可用CPU处理器数量的4倍,且固定范围为最小128,最大为512

由此可见如果用一般服务器的话,线程池上限最终会被确定为128,可以说还是比较高的设定了。

接下来开始构造执行不同操作时线程池Executor,ElasticSearch中把各个操作的Executor构造为Map,Map<String, ExecutorBuilder>,下面是各个Executor对象的解释:

  • 普通操作的Executor:构建一个可伸缩的Executor构建器,value为ScalingExecutorBuilder对象。接收参数和对应操作如下:

    • name:线程池执行者的名称,也就是generic
    • core:线程池中线程的最小值,固定为4。将thread_pool.generic.core的设为这个值。
    • max:线程池中线程的最大值,对应上面提到的genericThreadPoolMax,在本机跑的结果是128
    • keepAlive:超过4个线程后,线程保持活跃的时间。这个值固定为30秒。这个参数被设定为变量thread_pool.generic.keep_alive
  • 索引操作的Executor:构建一个固定的Executor构建器。key为index,value为FixedExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.index.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是idnex
    • size:线程的固定大小,和参数name一起构造配置变量thread_pool.index.size的值为size的值,本机跑的结果是4
    • queueSize:阻塞队列的大小,构造配置变量thread_pool.index.queue_size的值为200,注意这个值固定为200
  • 批处理操作的Executor:构建一个固定的Executor构建器。key为bulk,value为FixedExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.bulk.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是bulk
    • size:线程的固定大小,和参数name一起构造配置变量thread_pool.bulk.size的值为size的值,本机跑的结果是4
    • queueSize:阻塞队列的大小,构造配置变量thread_pool.bulk.queue_size的值为200,注意这个值固定为200
  • get操作的Executor:构建一个固定的Executor构建器。key为get,value为FixedExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.get.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是get
    • size:线程的固定大小,和参数name一起构造配置变量thread_pool.get.size的值为size的值,本机跑的结果是4
    • queueSize:阻塞队列的大小,构造配置变量thread_pool.get.queue_size的值为1000,注意这个值固定为1000
  • 查询操作的Executor:构建一个根据利特尔法则自动扩展长度的Executor构建器,这个构建器的逻辑与其他构建器不同,也显得比较复杂,也说明了对于查询操作,ElasticSearch做了特殊的优化。key为search,value为AutoQueueAdjustingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.search.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是search
    • size:线程的固定大小,和参数name一起构造配置变量thread_pool.search.size的值为size的值,本机跑的结果是7
    • initialQueueSize:初始化队列的大小,固定设置为1000,造配置变量thread_pool.search.queue_size的值为200
    • minQueueSize:队列的最小长度,固定设置为1000设定配置变量thread_pool.search.min_queue_size的值为1000
    • maxQueueSize:队列的最大长度,固定设置为1000,设定配置变量thread_pool.search.max_queue_size的值为1000
    • frameSize:队列的步进长度,固定设置为2000,构造配置变量thread_pool.search.auto_queue_frame_size的值为200,注意这个值固定为200
    • thread_pool.search.target_response_time针对search操作的相应被设置为1S,
  • 管理操作的Executor:构建一个可伸缩的Executor构建器。key为management,value为ScalingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.management.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是management
    • size:线程的固定大小,和参数name一起构造配置变量thread_pool.management.size的值为size的值,本机跑的结果是1
    • queueSize:阻塞队列的大小,构造配置变量thread_pool.management.queue_size的值为200,注意这个值固定为200
    • keepAlive:超过1个线程后,线程保持活跃的时间。这个值固定为5分钟。这个参数被设定为变量thread_pool.management.keep_alive
  • 监听操作的Executor:构建一个固定的Executor构建器。key为listener,value为FixedExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.listener.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是listener
    • size:线程的固定大小,上文提到的halfProcMaxAt10,和参数name一起构造配置变量thread_pool.listener.size的值为size的值,本机跑的结果是2
    • queueSize:阻塞队列的大小,构造配置变量thread_pool.listener.queue_size的值为**-1**,意思就没有阻塞队列。
  • flush操作的Executor:构建一个可伸缩的Executor构建器。key为flush,value为ScalingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.flush.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是flush
    • size:线程的固定大小,上文提到的halfProcMaxAt5,和参数name一起构造配置变量thread_pool.flush.size的值为size的值,本机跑的结果是4
    • keepAlive:超过1个线程后,线程保持活跃的时间。这个值固定为5分钟。这个参数被设定为变量thread_pool.management.keep_alive
  • refresh操作的Executor:构建一个可伸缩的Executor构建器。key为refresh,value为ScalingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.refresh.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是refresh
    • size:线程的固定大小,上文提到的halfProcMaxAt10,和参数name一起构造配置变量thread_pool.refresh.size的值为size的值,本机跑的结果是4
    • keepAlive:超过1个线程后,线程保持活跃的时间。这个值固定为5分钟。这个参数被设定为变量thread_pool.management.keep_alive
  • warmer操作的Executor:构建一个可伸缩的Executor构建器。key为warmer,value为ScalingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.warmer.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是warmer
    • size:线程的固定大小,上文提到的halfProcMaxAt5,和参数name一起构造配置变量thread_pool.warmer.size的值为size的值,本机跑的结果是4
    • keepAlive:超过1个线程后,线程保持活跃的时间。这个值固定为5分钟。这个参数被设定为变量thread_pool.management.keep_alive
  • snapshot操作的Executor:构建一个可伸缩的Executor构建器。key为snapshot,value为ScalingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.snapshot.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是snapshot
    • size:线程的固定大小,上文提到的halfProcMaxAt5,和参数name一起构造配置变量thread_pool.snapshot.size的值为size的值,本机跑的结果是4
    • keepAlive:超过1个线程后,线程保持活跃的时间。这个值固定为5分钟。这个参数被设定为变量thread_pool.management.keep_alive
  • 碎片处理操作的Executor:构建一个可伸缩的Executor构建器。key为fetch_shard_started,value为ScalingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.fetch_shard_started.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是fetch_shard_started
    • size:线程的固定大小,和参数name一起构造配置变量thread_pool.fetch_shard_started.size的值为size的值,本机跑的结果是4
    • queueSize:阻塞队列的大小,构造配置变量thread_pool.fetch_shard_started.queue_size的值为200,注意这个值固定为200
  • 强制merge操作的Executor:构建一个可伸缩的 Executor构建器。key为force_merge,value为ScalingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.force_merge.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是force_merge
    • size:线程的固定大小,和参数name一起构造配置变量thread_pool.force_merge.size的值为size的值,本机跑的结果是4
    • queueSize:阻塞队列的大小,构造配置变量thread_pool.force_merge.queue_size的值为200,注意这个值固定为200
  • 获取碎片操作的Executor:构建一个可伸缩的Executor构建器。key为fetch_shard_store,value为ScalingExecutorBuilder对象,接收参数和对应操作如下:

    • settings:Node的配置settings。设定配置变量thread_pool.fetch_shard_store.size的值为该参数中cpu的数量
    • name:线程池执行者的名称,也就是fetch_shard_store
    • size:线程的固定大小,和参数name一起构造配置变量thread_pool.fetch_shard_store.size的值为size的值,本机跑的结果是4
    • queueSize:阻塞队列的大小,构造配置变量thread_pool.fetch_shard_store.queue_size的值为200,注意这个值固定为200

至此就完成了org.elasticsearch.threadpool.ThreadPool对象的创建。

ThreadPool对象的作用

得到ThreadPool的对象后,通过线程池进行了NodeClient的构建。

client = new NodeClient(settings, threadPool);

ResourceWatcherService对象的构建,

final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);

后面还有很多的组件都用到了线程池,比如:

  • IngestService
  • ClusterInfoService
  • MonitorService
  • ActionModule
  • IndicesService
  • NetworkModule
  • TransportService
  • DiscoveryModule
  • NodeService

可以看出都是ElasticSearch的核心组件,这些组件的功能和原理,我都会在以后的文章中讲解,而像ElasticSearch这种存储搜索系统来说IO操作肯定非常频繁,而线程池是专门致力于解决系统的IO问题,它在这些服务组件中的作用也显得愈发重要。

利特尔法则

查询操作中提到的利特尔法则是一种描述稳定系统中,三个变量之间关系的法则。

其中L表示平均请求数量,λ表示请求的频率,W表示响应请求的平均时间。举例来说,如果每秒请求数为10次,每个请求处理时间为1秒,那么在任何时刻都有10个请求正在被处理。回到我们的话题,就是需要使用10个线程来进行处理。如果单个请求的处理时间翻倍,那么处理的线程数也要翻倍,变成20个。

理解了处理时间对于请求处理效率的影响之后,我们会发现,通常理论上限可能不是线程池大小的最佳值。线程池上限还需要参考任务处理时间。

假设JVM可以并行处理1000个任务,如果每个请求处理时间不超过30秒,那么在最坏情况下,每秒最多只能处理33.3个请求。然而,如果每个请求只需要500毫秒,那么应用程序每秒可以处理2000个请求。


今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/eKEn7yksJi
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/7065
 
635 次点击