前面两篇文章 Elasticsearch 入门学习 和 Elasticsearch 搜索的高级功能学习 中介绍了 Elasticsearch 的基本知识点以及相关搜索功能,这篇文章对 Elasticsearch 分布式原理以及在分布式下是如何进行文档的查询和更新相关逻辑学习总结:
Elasticsearch 支持集群部署,一个集群下可以部署多个节点,每个节点就是一个 ES 的实例(一个 JAVA 进程),可以通过启动参数 cluster.name 和 node.name 来分别指定集群和节点的名称:
bin/elasticsearch -E node.name=node1 -E cluster.name=myEs -d
bin/elasticsearch -E node.name=node2 -E cluster.name=myEs -d
bin/elasticsearch -E node.name=node3 -E cluster.name=myEs -d
复制代码
Elasticsearch 分布式集群部署主要有以下优点:
- 提供了存储的水平扩容能力,使其可以支持到 PB 级别的数据量
- 保证了系统的高可用性,单个节点停止服务,也不会影响整个集群
- 有强大的数据备份能力,保证了数据的安全性
节点分类
一个集群下面针对不同的功能划分成不同类型的节点:
Data Node
- 保存数据的节点,节点启动后,默认是 Data Node,可以通过 node.data=false 禁止
- 通过增加 Data Node 可以解决数据水平扩展和数据单点问题
Ingest Node
- 可以在文档建立索引之前设置一些 ingest pipeline 的预处理逻辑,来丰富和转换文档
- 默认启动就是 Ingest Node,可用通过 node.ingest = false 来禁用
- 如果处理的 Processor 压力比较大,建议将该 Node 和 Data Node 进行分离
Master Eligible Node
- Master Eligible Node 有资格被选举为 Master Node,一个集群可以配置多个 Master Eligible Node
- 每个 Node 默认启动是 Master Eligible Node,可以通过参数 node.master=false 来禁止
- 在 Master Node 出现故障时,Master Eligible Node 会参与选举 Master Node
- 当集群内第一个 Master Eligible Node 节点启动后,会将自己选举成 Master Node
- Master Eligible Node 最佳实践:
- 不能单点部署,为一个集群分配多个 Master Node
- 每个 Master Node 只承担一个角色
复制代码
Voting-only master-eligible node
- 有权选举但是没办法成为 Master 的节点
Machine Learning Node
- 用于机器学习处理的节点
- 如果想使用该功能,必须 xpack.ml.enabled = true 且 node.ml = true
Coordinating Node
- 处理路由请求的节点,ES 收到请求后会先发到 Coordinating Node,然后 Coordinating Node 再负责将请求分发并对请求结果进行汇总
- 所有节点默认是 Coordinating Node,只有 node.master,node.ingest, node.data 都等于 false 的情况下,才不是 Coordinating Node
Master Node
- 负责创建,删除索引
- 决定分片如何被分配到哪个节点
- 维护并更新 Cluster State
- 所以的节点信息
- 所有的索引和其 Mapping 和 Setting 的信息
- 分片路由信息
- 只有 Master Node 可以更新 Cluster State,并负责同步到其它 Node
复制代码
节点类型的汇总
节点类型 | 配置参数 | 默认值 |
---|---|---|
master eligble | node.master | true |
data | node.data | true |
ingest | node.ingest | true |
coordinating only | 无 | 上面三个参数全部为false |
machine learning | node.ml | true(需要 enable x-pack) |
集群的管理和节点发现机制
节点间的通信机制
- 集群节点之间的通信是通过 Transport 模块实现的
- Transport 模块是基于 TCP 实现的集群中不同 Node 之间进行数据传输的模块
- Transport 模块还会用于 Elasticsearch Java API 中的 TransportClient
- Transport 模块数据传输通过高性能的异步 IO 完成的
- long-lived idle connections:
- 节点之间可能会存在大量的空闲 TCP 连接
- 如果这些链接被关闭可能会破坏集群的正常运行
- 尽量保证开启了 tcp.keep_alive 选项且使该 keepalive 间隔时间小于任何其它可能使连接关闭的 timeout
- 如果 keepalives 无法配置,可以使用 transport.ping_schedule 进行连接检查
复制代码
- 我们可以通过 Transport Tracer 打印日志跟踪内部节点之间的通信,可以动态开启或者关闭
PUT _cluster/settings
{
"transient" : {
"logger.org.elasticsearch.transport.TransportService.tracer" : "TRACE"
}
}
复制代码
- Request Compression
- 通过参数 transport.compress 可以开启请求的压缩,默认关闭的
- 请求压缩会造成 CPU 的消耗,如果对于网络传输性能高的集群不建议开启
复制代码
节点发现机制
节点发现逻辑用于处理一个新节点如何正确的加入到集群中,当一个节点启动后可以通过 seed hosts providers 得到用于节点发现的种子(seed addresses)。seed addresses 可能是一个 IP,或者是 IP + PORT 或者是一个 HOST 的组合,对应的是集群中的一些节点的 Transport 通信地址。
ES 提供了两种 seed hosts providers,可以通过启动参数 discovery.seed_providers 进行设置,默认是 "settings-based":
- settings-based provider:启动时参数配置
- discovery.seed_providers:
- settings-based
- discovery.seed_hosts:
- 192.168.1.10:9300 # ip + port
- 192.168.1.11 # ip
- seeds.mydomain.com # host
复制代码
- file-based provider:读取文件配置
// 配置信息存储在文件 $ES_PATH_CONF/unicast_hosts.txt 中
// unicast_hosts.txt 一旦有改动,ES 都会监听到,并实时获取到新的 hosts
- discovery.seed_providers: file
复制代码
当一个刚刚启动的节点通过 seed hosts providers 拿到 seed addresses 后,通过以下步骤进行集群发现并加入集群:
- 尝试去连接每一个 seed address,如果发现某一个 Node 是 Master Eligible Node,那么该 Master Eligible Node 会共享它知道的 Master Eligible Node,这些共享的 Master Eligible Node 也会作为 seed addresses 继续去试探
- 直到找到某一个 seed addresss 对应的是 Master Node 或者找到足够数量的 Master Eligible Node 可以参与选举 Master Node 为止
- 如果第二步没有找到任何满足条件的 Node,ES 会每隔 discovery.find_peers_interval 秒后去重新尝试寻找,默认为 1 秒
- 重复第三步操作直到找到满足条件为止
下面是集群发现机制的一些参数配置:
- transport.profiles.default.port:如果 seed address 给了 IP 但是没有提供端口那么就使用该配置的端口
- transport.port:如果 seed address 给了 IP 没有提供端口且 transport.profiles.default.port 也没有设置,那么就使用该配置的端口
- discovery.seed_resolver.timeout:每次查询试探的超时时间
- discovery.seed_resolver.max_concurrent_resolvers:同时探测的 seed addresss 个数
复制代码
- 什么情况下会使用发现机制呢?
- 当你启动一个 Elasticsearch 节点时,该节点需要找到集群中的其它节点
- 当一个 Master 节点发生故障时,节点之间需要通过选举的方式找到新的 Master 节点
复制代码
集群选主逻辑
- 选主是通过 quorums 实现的,在所有的 quorums 中选择一个 Node 作为 Master Node
- quorums 是集群中所有 Master Eligible Node 的一个子集,quorums 之所以是 Master Eligible Node 的子集,是因为可能有些 Master Eligible Node 出现故障,无法建立连接
- 如何选主:
- 选主流程出现当一个新节点启动或者已经存在的 Master Node 出现故障时
- 任何一个 Master-Eligible Node 可以开启一个选举,一般情况下第一个开始选举的 Master-Eligible Node 会成为 Master Node
- 每个 Master-Eligible Node 节点选举的时间点是随机安排的,以防出现多个节点同一时间点进行选举的情况
- 如果同一时间点出现两个 Master-Eligible Node 进行选举,那么选举将会失败,后续会进行选举重试
复制代码
- 要合理的设置和选择 quorums,以防在选主的时候出现脑裂问题(split brain):
- 什么是脑裂问题?
脑裂问题是指在分布式系统中当网络出现问题时,一个节点无法与其它节点建立连接时,这个节点会自己作为 Master Node 组成一个集群,而其它节点同时也会组成一个新的集群,这时会出现两个 Master Node,并且维护不同的 Cluster State,导致当网络恢复时,无法正确恢复数据
- 如何避免脑裂问题
可以通过限制选举条件 discovery.zen.minimum_master_nodes 的值,只有 Master Eligible Node 节点的个数大于 discovery.zen.minimum_master_nodes 时,才具备选举条件,可以有效避免脑裂问题
比如当有三个 Master Eligible Node 时,可以设置 discovery.zen.minimum_master_nodes = 2,即可避免脑裂问题,但是当只有 2 个 Master Eligible Node 时是无法避免脑裂问题的
从 ES7.0 以后,为了避免 discovery.zen.minimum_master_nodes 参数设置错误,ES 移除了该参数的设置,让 ES 自己选择可以形成仲裁的节点数
复制代码
分片和集群的故障转移
分片
- 分片是物理空间概念,索引中的数据都分布在分片上,一个分片就是运行的一个 Lucene 的实例
- 分片分为主分片和副分片,主分片默认值是 1, 副分片默认值是 0
- 主分片存储索引数据,实现存储的水平扩容,在创建索引时指定,后续不能修改
- 副分片主要用于提高数据的高可用性,并且可以用来读取,提高系统的读取吞吐量
- 主分片数设置过小,无法满足后续增加节点水平扩容的需求,设置过大可能导致一个节点上有多个分片,进而影响写入和读取的性能
- 副分片设置过多,每次写入都要同时同步,影响写入性能,设置过小会影响高可用性
- 主分片支持读写,副分片只读
- 通过增加节点和分片提高机器的故障转移能力和计算能力
- 相同的索引,主分片和副分片是不能存在同一个 Data Node
PUT users
{
"settings": {
"number_of_shards": 3, //设置主分片数
"number_of_replicas": 1 //设置副分片数
}
}
复制代码
监控状态检查
可以通过 API:GET /_cluster/health 进行集群的监控检查
- status = green : 健康状态,所有主分片和副分片都可以使用
- status = yellow :亚健康状态,所有主分片可用,副分片不可以用
- status = red :不健康状态,部分主分片不可用
GET /_cluster/health
返回结果 =>
{
"cluster_name" : "es_learning",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 2,
"number_of_data_nodes" : 2,
"active_primary_shards" : 6,
"active_shards" : 12,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 100.0
}
复制代码
文档分布式存储原理
文档到分片的路由算法
shard = hash(_routing) % number_of_primary_shards
- Hash 算法可以确保文档均匀分散到分片中
- _routing 默认值是文档 id,可以保证同一个文档被分配到同一个分片
- 因为通过主分片个数进行取余运算,所以 index 建立好以后,分片数不允许随便修改
- _routing 也可以自己指定,例如可以将相同国家的商品分配到同一个分片上
//插入数据时,通过 routing 参数指定 _routing
PUT users/_doc/100?routing=china
{
"title": "china"
}
复制代码
- 更新一个文档的过程:
1. 更新文档的请求先发送到某一个 Coordinating Node 上
2. Coordinating Node 根据 hash 算法计算该分配哪个分片上
3. 在对应的分片上先删除文档再新建文档
复制代码
倒排索引不可变性
- 倒排索引采用 Immutable Design,一旦生成,不可更改
- 优点:1)无需考虑并发写文件问题,避免了锁机制带来的性能问题 2)缓存容易生成和维护,提高性能 3)数据可以被压缩
- 缺点:一个新的文档需要被搜索,就需要重新建整个索引
Lucene Index
- 在 Lucene 中,单个倒排索引文件被称为 Segment,多个 Segment 汇总在一起,被称为 Lucene Index
- Lucene Index 对应的 ES 中的 Shard(分片)
- Segment 是自包含的,不可变的
- 当有新的文档写入时会生成多个新的 Segment,查询时会查询所有的 Segments 并进行汇总
Index Buffer
- 文档被写入以后其实并没有被直接物理性的写入到 Lucene Index 中, 如果每次索引一个文档都去执行一次的话会造成很大的性能问题,所以首先会被写入 Index Buffer 内存空间中
- 此时文档没有进入 Segment,所以新写入的文档没办法被搜索到
- 写入 Index Buffer 后,API 写入接口会立即返回,后续操作异步执行
Transaction Log
- 为了保证在重启时写入到 Index Buffer 的数据不会丢失,在写入 Index Buffer 的同时,会将数据写入 Transaction Log
- 在断电重启的情况下,ES 会将没有 Refresh 的数据从 Transaction Log 写入 Segment
- Transaction Log 不会立即删除,会在 Flush 时进行删除
Refresh
- 为了能保证搜索的实时性,ES 会每隔一秒将 Index Buffer 写入的段中,这个过程叫做 Refresh
- Refresh 默认一秒发生一次,可以通过 index.refresh_interval 参数设置
- Refresh 后的数据并没有被真正提交刷新到磁盘,而是一个文件系统的缓存中,只不过此时可以支持搜索
- Refresh 后 Index Buffer 和 Transaction Log 并不会立即删除,会在 Flush 时删除
- 当 Refresh 以后也就表示的文档客户搜索了,所以 ES 可以保证写入文档 1 秒后就可以进行搜索
- 当 Index Buffer 被占满时也会被触发 Refresh,默认值是 JVM 的 10%
- indices.memory.index_buffer_size:设置 Index Buffer 的大小,可以是百分比相对 JVM 的值,也可以是一个绝对值
- indices.memory.min_index_buffer_size:如果 index_buffer_size 是相对百分比设置,那么这个可以控制 index_buffer_size 的下限,默认 48 mb
- indices.memory.max_index_buffer_size: 如果 index_buffer_size 是相对百分比设置,那么这个可以控制 index_buffer_size 的上限,默认无限大
复制代码
Flush
- Flush 主要用来一次性清空 Index Buffer 也就是执行 Refresh
- 同时会调用 fsync 将文件系统缓存中的 Segments 刷新到磁盘
- 此时同时刷新到磁盘的几个段被称为一个 Commit Point,完成一次真正的提交
- Flush 完了以后清空 Transaction Log 和 Index Buffer
- 因为每次 Flush 性能消耗比较大,所以系统默认 30 分钟执行一次
- 当 Transaction Log 大于 512 MB 时也会执行一次 Flush
Segment Merge
- 由于 Flush 每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增,造成句柄,cpu,内存的浪费,因此必须得进行 Segment Merge
- Segment Merge 会在后台自动进行,小的段被合并到大的段,然后这些大的段再被合并到更大的段
- Segment Merge 的时候会将那些旧的已删除文档从文件系统中清除。被删除的文档(或被更新文档的旧版本)不会被拷贝到新的大段中
- ES 和 Lucene 会自动进行 Merge
- 也可以调用 POST {index_name}/_forcemerge 进行手动 merge
分布式分页查询
ES 查询分为两个阶段: Query 和 Fetch,假设我们进行分页查询从 FROM 到 FROM + SIZE 的文档信息,下面是查询流程:
Query 阶段
- 节点收到请求后会以 Coordinating Node 的身份在主副分片中随机选取几个互不为主副的所有分片,发送查询请求
- 被选中的每个分片执行查询并进行排序,返回 From + Size 个排序后的文档 ID 和排序值给 Coordinating Node
Fetch 阶段
- Coordinating Node 会将在 Query 阶段从每个分片返回的排序后的文档 ID 列表进行重新排序
- 排序后选取 From 到 From + Size 个文档的 ID
- 以 Multi Get 的方式将上面的文档 ID 列表发送到相应的分片上获取详细文档数据
存在的问题
- 深度分页性能问题:每个分片都需要获取 From + Size 个文档,分片越多性能反而越低
- 相关性算分偏离:每个分片都基于自己的分片数据进行算分,可能导致打分偏离,解决相关性算分不准确的方案如下:
- 在数据量不大的情况下,可以将分片个数设置为 1,这样只在一个分片上查询就不会出现相关性算分偏离了
- DFS Query Then Fectch:可以在查询时加入 search_type=dfs_query_then_fetch 可以使每个分片把各个分片的词频和文档频率进行搜集,然后在 Coordinating Node 再进行一次相关性算分,这种情况耗费更多的 CPU 和内存,非必要情况不建议使用
复制代码
如何避免深度分页
- search_after
POST users/_search
{
"size": 1,
"query": {
"match_all": {}
},
// 不支持指定 From,必须指定 sort,并且要保证排序的唯一性,可以加入 _id 保证唯一性
// 使用 search_after 指定的 sort 对应的字段值,他替代了 From 的值,表示大于 search_after 的值后面 Size 个数据
// 这样在每个分片上每次只需要获取 Size 个文档就可以了,而不需要获取 From + Size 个文档,减少内存占用和 CPU 消耗
// 我们可以使用上一次获取到的最后一个文档进行迭代查询
"search_after": [10, "ZQ0vYGsBrR8X3IP75QqX"],
"sort": [
{"age": "desc"},
{"_id": "asc"}
]
}
复制代码
- Scroll API
// 和 search_after 类似,每次查询时,指定上一次查询到的 scroll_id
// Scroll API 会创建一个快照进行查询,如果有新的数据写入后,新数据将无法被查到
// Scroll API 一般用于需要全部文档,例如导出全部数据之类的
POST /_search/scroll
{
"scroll" : "1m", // 快照的存活时间
"scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAWAWbWdoQXR2d3ZUd2kzSThwVTh4bVE0QQ=="
}
复制代码
分布式排序查询逻辑
ElasticSearch 在查询时支持对多个字段进行排序查询:
POST /kibana_sample_data_ecommerce/_search
{
"size": 5,
"query": {
"match_all": {
}
},
"sort": [
{"order_date": {"order": "desc"}},
{"_doc":{"order": "asc"}},
{"_score":{ "order": "desc"}}
]
}
复制代码
排序的逻辑
- 排序是针对字段原始内容进行排序的,所以倒排索引无法发挥作用
- 需要用到正排索引,通过文档 ID 和字段快速得到字段原始内容
- Elasticsearch 提供了两种排序的实现方法:Field Data 和 Doc Values
- Field data 和 Doc Values 对比
对比属性 | Doc Values | Field Data |
---|---|---|
何时创建 | 创建文档时和倒排索引一起创建 | 搜索时候动态创建 |
创建位置 | 磁盘文件 | JVM Heap |
优点 | 避免大量内存占用 | 索引速度快,不占用磁盘空间 |
缺点 | 降低索引速度,占用额外磁盘文件 | 动态创建开销大,占用过多的 JVM Heap |
默认值 | ES 2.x 以后 | ES 1.x 之前 |
- Doc Values 对 text 类型的字段不生效,对其它类型的字段默认是开启的
- 如果重新打开 Doc Values 排序,需要重新建立索引,只有明确不需要做排序和聚合分析的情况下才建议关闭
- 在设置 mapping 时可以通过 doc_values 和 field_data 参数来设置是否启用对应的排序方法
//打开 text 类型的字段的 Field Data 排序方法
PUT kibana_sample_data_ecommerce/_mapping
{
"properties": {
"customer_full_name" : {
"type" : "text",
"fielddata": true
}
}
}
PUT test_keyword/_mapping
{
"properties": {
"user_name":{
"type": "keyword",
"doc_values":false //将 doc_values 排序方法关掉
}
}
}
复制代码
读写的并发控制
同时对文档进行变更可能会导致数据丢失的问题,必须采用锁机制来控制对同一个资源的操作,ES 采用的乐观并发控制逻辑,由应用程序告知当前的文档的版本信息,如果版本信息不匹配,ES 将会报错,由用户来决定如何解决冲突,比如重试更新,或者将报错报告给用户等。
当我们更新获取获取某个文档的信息时,会返回该文档对应的 _seq_no 和 _primary_term
GET report/_doc/1
=> 返回结果:
{
"_index" : "report",
"_type" : "_doc",
"_id" : "1",
"_version" : 5,
"_seq_no" : 4, // 版本控制信息
"_primary_term" : 1, // 版本控制信息
"found" : true,
"_source" : {
"name" : "未命名报告",
"id" : 1,
"setting" : {
"data" : "helloworld"
},
"projectId" : 1
}
}
复制代码
当我们对该文档进行更新时,可以带上参数 if_seq_no 和 if_primary_term 两个参数,用来判断版本是否匹配,如果版本不匹配,系统会抛出错误
// 如果不匹配会报错:version_conflict_engine_exception
PUT report/_doc/1?if_seq_no=4&if_primary_term=1
{
"name": "未命名报告",
"id": 1,
"projectId": 1
}
复制代码