社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

Elasticsearch 5.x 源码分析(5)segments merge 流程分析 - 简书

赵安家 • 7 年前 • 788 次点击  

这两周主要看了下 Elasticsearch(其实是Lucene)的 segments 的 merge 流程。事情起因是,线上的ES有些大索引,其中的segments 个数几十个,每个大小100M+,小 segments 若干,而遇到问题就是这些大的 segments 不再做 merge 了,除非强制进行forceMerge 操作,由于我们第一次ES上线,其实也不清楚这究竟是个问题还是本来 Lucene 就是这样,网上找了一些关于ES 或者 Lucene 的 merge 的策略介绍,除了说道大家都了解的一些常规的参数,如最大size,最大doc 下不会再做merge云云,就是没提到到了多少个 segments 之后就不 merge ,接着问了Elasticsearch 圈子的一些人,也没有找到非常确定的答案。查找几天资料无果,顺带就看看源码,最终在昨天又瞄了几眼终于发现一段算法,虽无验证,但应八九不离十,故记录分享之。


Segments

首先还是先重温一下 Lucene 下的 segments,对这个比较陌生的可以阅读三斗大神的这一节

Lucene产生segments示意图

我只引用最下面那张图介绍一下,绿色的就是已经固化的一个个的 segments 文件,不会再更新,左下角就是当前在内存的 Lucene 维护的查询可见的仍为持久化的segment,当Elasticsearch 配置的refresh_invterval (默认是1s,可调)到时,这些in in-memory buffer就会推送到OS的文件系统缓存中去,注意这里只是到缓存,很可能OS仍未持久化到文件系统,成为一个单独的 segment 文件,而啥时commit 到文件系统永不丢失,则由Lucene 的flush 机制保证,当Lucene 做完flush 则表明该 segment 真正推送到文件系统,此时才会在translog做标记并可以删除commit之前的translog 了。

注意这里只是一个简化描述,据三斗和赖总介绍,Lucene 仍有很多因素会促使产生一个segment 而不是百分百由Elasticsearch的refresh_interval 决定。这里就不继续讨论究竟在哪些情况会立即生成一个segment了。

Segment 的merge

详细信息可看三斗这一节 这里只引用两个图介绍一下

merge前
merge后

从图上看,Lucene每次会选取一些小的segments 进而merge到一个大的segment,我这里不再赘述流程和策略,这里只补充一句就是,如果你之前用的scroll查询,之前的scroll还是会指向老的segments,也就是说老的segments 的引用会知道scroll失效后才会被回收。


Elasticsearch 5.x merge 参数的变化

在老的Elasticsearch 中,merge 被认为是一个非常消耗资源的操作,甚至只有一个线程来做这事,并且会影响indexing的request。在之前的版本里,merge 操作用的是一类 merge throttle limit这样的配置来限制各种峰值数据,如下面这些参数,注意这些参数都已经在5.x 中移除掉了。

  • indices.store.throttle.type
  • indices.store.throttle.max_bytes_per_sec

因为在Elasticsearch 5.x 想采用多线程和动态调整这种方式来更加智能地去执行merge操作。如检测是否使用SSD硬盘,应该启动多少个merge线程等。
在Elasticsearch 5.x 下,tired merge policy 成为了唯一的merge策略。因此下面的参数同样也在5.x 下被移除了。

  • index.merge.policy.type
  • index.merge.policy.min_merge_size
  • index.merge.policy.max_merge_size
  • index.merge.policy.merge_factor
  • index.merge.policy.max_merge_docs
  • index.merge.policy.calibrate_size_by_deletes
  • index.merge.policy.min_merge_docs
  • index.merge.policy.max_merge_docs

如上面说的,ES希望采用一种更智能的方式去调整这些参数,达到一个性能的折中。在5下我们可以配置这些参数:

  • index.merge.policy.expunge_deletes_allowed: 指删除了的文档数在一个segment里占的百分比,默认是10,大于这个值时,在执行expungeDeletes 操作时将会merge这些segments.
  • index.merge.policy.floor_segment: 官网的解释我没大看懂,我的个人理解是ES会避免产生很小size的segment,小于这个阈值的所有的非常小的segment都会做merge直到达到这个floor 的size,默认是2MB.
  • index.merge.policy.max_merge_at_once: 一次最多只操作多少个segments,默认是10.
  • index.merge.policy.max_merge_at_once_explicit: 显示调用optimize 操作或者 expungeDeletes时可以操作多少个segments,默认是30.
  • index.merge.policy.max_merged_segment: 超过多大size的segment不会再做merge,默认是5g.
  • index.merge.policy.segments_per_tier: 每个tier允许的segement 数,注意这个数要大于上面的at_once数,否则这个值会先于最大可操作数到达,就会立刻做merge,这样会造成频繁
  • index.reclaim_deletes_weight: 考虑merge的segment 时删除文档数量多少的权重,默认即可.
  • index.compund_format: 还不知道干啥用的,默认即可.

merge 线程调整

Elasticsearch 5 采用了多线程去执行merge,可以通过修改index.merge.scheduler.max_thread_count 来动态调整这个线程数,默认的话是通过下面公式去计算:

Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2)) 

要注意的是如果你是用HDD而非SSD的磁盘的话,最好是用单线程为妙。


force merge API

这里有3个参数可以用

  • max_num_segments 期望merge到多少个segments,1的意思是强行merge到1个segment
  • only_expunge_deletes 只做清理有deleted的segments,即瘦身
  • flush 清理完执行一下flush,默认是true

你可以用下面的URL来执行强行的merge

curl -XPOST "http://localhost:9200/library/_forcemerge?max_num_segments=1

代码介绍

merge的代码相对比较少,因为基本上ES并没有做什么东西,只是采用了多线程,和直接调用Lucene的API而已,主要看几个类:

  • MergePolicyConfig.java 专门管理我们输入的那些参数
  • MergeScheduler.java 这个类其实就是封装了一下Lucene的几个调用
  • ConcurrentMergeScheduler.java 继承了MergeScheduler.java在上面实现了多线程分工并加上了多线程下的一些限制,如上面配置的那些最大XXX,最多XXX 之类的。
  • ElasticsearchConcurrentMergeScheduler.java 继承了ConcurrentMergeScheduler.java 通过配置去控制整个ES实例的merge 流程的运作,还有打印日志等。
  • TieredMergePolicy.java merge的策略控制类,之前提到了ES5后只剩下这个唯一的默认的策略控制,所有的选择,打分,触发merge的策略都在这里定义。

最后发现问题就出在TieredMergePolicy.java 上,再次回顾我遇到的问题

事情起因是,线上的ES有些大索引,其中的segments 个数几十个,每个大小100M+,小 segments 若干,而遇到问题就是这些大的 segments 不再做 merge 了,除非强制进行forceMerge 操作。

然后我们跳到下面这个方法:

public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, IndexWriter writer) throws IOException {

<snip> //这里主要拿到总bytes数,segments数,踢掉超过阈值的segments

    minSegmentBytes = floorSize(minSegmentBytes);
    // Compute max allowed segs in the index
    long levelSize = minSegmentBytes;
    long bytesLeft = totIndexBytes;
    double allowedSegCount = 0;
    while(true) {
      final double segCountLevel = bytesLeft / (double) levelSize;
      if (segCountLevel < segsPerTier) {
        allowedSegCount += Math.ceil(segCountLevel);
        break;
      }
      allowedSegCount += segsPerTier;
      bytesLeft -= segsPerTier * levelSize;
      levelSize *= maxMergeAtOnce;
    }
    int allowedSegCountInt = (int) allowedSegCount;

<snip>

问题就出在这段算法里了,tieredPolicy,顾名思义就是梯队,阶级,等级,就是把所有的segments 分层一个个的阶级,ES的设想就是 每个tier 都应该至少包含 segsPerTier 个segments,这样从上至下就可以分批的一次每个tier都可以做一轮merge操作,举个例子,如果我们按默认值floor的size是2MB,maxMergeAtOnce 使用默认10 的话,那么最低层就应该有 10 个 2MB的segments,做完merge 应该就会产生一个20MB的 segment,那么这个20MB应该就是下一个tier, 在这个tier里也应该有至少10个20MB的segments 来等待做merge,如此类推。
那我就用我遇到的例子来演绎一遍上面的算法,假设我有5个100MB的大segments,下面就只有少数几个的segments,segsPerTiermaxMergeAtOnce 都采用默认值10。

  1. 第一次while,segCountLevel = 500/2 = 250 显然大于10,所以allowedSegCount = 10bytesLeft = 500 - 2*10 = 480levelSize = 2*10 = 20
  2. 第二次while,segCountLevel = 480/20 = 24 还是大于10,所以allowedSegCount = 10 + 10 = 20bytesLeft = 480 - 20*10 = 280levelSize = 20*10 = 200
  3. 第三次while,segCountLevel = 280/200 = 1.4 小于10 了,所以allowedSegCount = 20 + 2 = 22; 退出

也就是说这次的merge 操作,根据当前segments总的字节数推算,ES应该是被允许最多merge 22 个segments;接着就是去找实际可以merge的总的eligible的segments数量

// Gather eligible segments for merging, ie segments
     // not already being merged and not already picked (by
     // prior iteration of this loop) for merging:
     final List<SegmentCommitInfo> eligible = new ArrayList<>();
     for(int idx = tooBigCount; idx<infosSorted.size(); idx++) {
       final SegmentCommitInfo info = infosSorted.get(idx);
       if (merging.contains(info)) {
         mergingBytes += size(info, writer);
       } else if (!toBeMerged.contains(info)) {
         eligible.add(info);
       }
     }

     final boolean maxMergeIsRunning = mergingBytes >= maxMergedSegmentBytes;

     if (verbose(writer)) {
       message("  allowedSegmentCount=" + allowedSegCountInt + " vs count=" + infosSorted.size() + " (eligible count=" + eligible.size() + ") tooBigCount=" + tooBigCount, writer);
     }

     if (eligible.size() == 0) {
       return spec;
     }

     if (eligible.size() > allowedSegCountInt) {
      //可以作为预备merge的segments大于允许的数,这轮merge可以做了
      //剩下就是为segments打分,选出一定数量的segments来merge
     } else {
      //达不到预期数量,不做了
     }

从上面看到,确实如果segments 不够ES被期望的达到那么多可被merge的segments 数量的时候,其实ES是不做merge的。那么就会在一种场景里面出现:

当索引达到了比较大时,这时经过了一定时间的merge 完成后,segments都会比较大,这时如果indexing的频率相对比较低时,则每轮merge 选择阶段就会得出ES期望这次可以merge的segments数就会比较大,而如果eligible的segments并没有那么大时,则ES就不会进行merge

这就是我得到的结论,还望各ES大神指点是否准确。


今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/UoTChzHc7p
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/21701
 
788 次点击