前言

前面介绍了很多ES使用过程中的具体实战知识点,本文主要是谈谈ES分布式架构原理。


一、Elasticsearch特点

elasticsearch是近实时的分布式搜索分析引擎,底层实现基于Lucene,核心思想是在多台机器上启动多个es进程实例,组成一个es集群。以下是es的几个概念:

  • 接近实时
    es是一个接近实时的搜索平台,这就意味着,从索引一个文档直到文档能够被搜索到有一个轻微的延迟
  • 集群(cluster)
    一个集群有多个节点(服务器)组成,通过所有的节点一起保存你的全部数据并且通过联合索引和搜索功能的节点的集合,每一个集群有一个唯一的名称标识
  • 节点(node)
    一个节点就是一个单一的服务器,是你的集群的一部分,存储数据,并且参与集群和搜索功能,一个节点可以通过配置特定的名称来加入特定的集群,在一个集群中,你想启动多少个节点就可以启动多少个节点。
  • 索引(index)
    一个索引就是还有某些共有特性的文档的集合,一个索引被一个名称唯一标识,并且这个名称被用于索引通过文档去执行搜索,更新和删除操作。一个索引可以相当于mysql中的一张表。
  • 类型(type)
    type 在6.0.0已经不赞成使用。
  • 文档(document)
    一个文档是一个基本的搜索单元,相当于一条表记录。
  • 分片(shard)
    一个索引可以由多个分片组成,多个分片可以分布在集群中多台机器上。

二、Elasticsearch总体架构

1: Gateway是ES用来存储索引的文件系统,支持多种类型。包括本地文件系统(默认),HDFS,S3等。gateway模块主要负责集群元信息的存储和集群重启时的恢复。
2: Distributed Lucene Directory是一个分布式的lucene框架
3: Lucene之上是ES的模块,包括:索引模块、搜索模块、映射解析模块等
4: ES模块之上是 Discovery、Scripting和第三方插件
5: Discovery是ES的节点发现模块,不同机器上的ES节点要组成集群需要进行消息通信,集群内部需要选举master节点,这些工作都是由Discovery模块完成。支持多种发现机制,如 Zen 、EC2、gce、Azure。
6: Scripting用来支持在查询语句中插入javascript、python等脚本语言,scripting模块负责解析这些脚本,使用脚本语句性能稍低。ES也支持多种第三方插件。预先定义好脚本内容,然后在mapping阶段或者search指定需要的脚本,相对于脚本语句查询来说提高性能
7: 再上层是ES的传输模块和JMX.传输模块支持多种传输协议,如 Thrift、memecached、http,默认使用http。JMX是java的管理框架,用来管理ES应用。
8: 最上层是ES提供给用户的接口,可以通过RESTful接口或java api和ES集群进行交互

三、Elasticsearch集群启动流程


集群启动流程:
1、elect master 选主流程,集群启动的第一件事是从己知的活跃机器列表中选择 个作为
主节点,选主之后的流程由主节点触发。
ES 的选主算法是基于 Bully 算法的改进,主要思路是对节点 ID 序,取 ID 值最大的节点
作为 Master,每个节点都运行这个流程。参与选举的节点数需要过半。

2、gateway过程 ,主节点发起选举获取最新的元数据信息,参与元信息选举的节点数需要过半。
被选出的 Master 和集群元信息的新旧程度没有关系。因此它的第一个任务是选举元信息,
让各节点把各自存储的元信息发过来 ,根据版本号确定最新的元信息然后把这个信息广播下
去,这样集群的所有节点都有了最新的元信息。
集群元信息 的选举包括两个级别:集群级和索引级。

3、allocation过程 ,选举shard级元信息,构建内容路由表。
在初始阶段,所有的shard都处于UNASSIGNED(未分配)状态。ES通过allocation(分配)过程决定哪个分片位于哪个节点,重构内容路由表。

4、recovery过程 ,根据tranlog恢复索引数据。
为什么需要recovery?
对于主分片来说,可能有一些数据没来得及刷盘。
对于副分片来,一是没刷盘,二是主分片写完来,但是副分片还没来得及写,导致主副分配数据不一致。

四、Lucene 索引更新过程

写入的数据是如何变成 Elasticsearch 里可以被检索和聚合的索引内容的?
总结一下 Lucene 的处理办法,很简单,就是一句话:新收到的数据写到新的索引文件里

Lucene 把每次生成的倒排索引,叫做一个段(segment)。然后另外使用一个 commit 文件,记录索引内所有的 segment。而生成 segment 的数据来源,则是内存中的 buffer。也就是说,索引数据动态更新过程如下:

1、当前索引有 3 个 segment 可用。

2、新接收的数据进入内存 buffer。

3、内存 buffer 刷到磁盘,生成一个新的 segment,commit 文件同步更新。索引状态如下:

利用磁盘缓存实现的准实时检索
既然涉及到磁盘,那么一个不可避免的问题就来了:磁盘太慢了!对我们要求实时性很高的服务来说,这种处理还不够。所以,在第 3 步的处理中,还有一个中间状态:

4、内存 buffer 生成一个新的 segment,刷到文件系统缓存中,Lucene 即可检索这个新 segment。

当在文件系统缓存中生成新的segment后,尽管没有被commit提交,但数据已经可以被检索到。

将内存中的数据刷写到这一步刷到文件系统缓存的步骤,在 Elasticsearch 中,是默认设置为 1 秒间隔的,对于大多数应用来说,几乎就相当于是实时可搜索了。Elasticsearch 也提供了单独的 /_refresh 接口,用户如果对 1 秒间隔还不满意的,可以主动调用该接口来保证搜索可见。

设置索引的刷新时间:

# curl -XPOST http://127.0.0.1:9200/test_index/_settings -d '{ "refresh_interval": "10s" }'

如果对实时性要求不高,可以增大刷新时间,降低刷新频率,提高数据写入速度。

注意⚠️:
执行完refresh操作后,并不能保证数据写入磁盘,只能保证新写入的数据在文件系统缓存中生成新的segment,并可以被检索到。

5、文件系统缓存真正同步到磁盘上,commit 文件更新。达到第3步中的状态。

五、tanslog保障一致性

既然 refresh 只是写到文件系统缓存,那么第 5 步写到实际磁盘又是有什么来控制的?如果这期间发生主机错误、硬件故障等异常情况,数据会不会丢失?

1)tanslog如何保证索引数据的一致性

这里,其实有另一个机制来控制。Elasticsearch 在把数据写入到内存 buffer 的同时,其实还另外记录了一个 translog 日志
(这一步可以参考mysql中的double write数据双写机制保证数据的一致性来理解。)

在refresh 发生的时候,translog已经记录了数据的变更信息并且持久化写到磁盘文件。

如果在这期间发生异常,Elasticsearch 会从 commit 位置开始,恢复整个 translog 文件中的记录,保证数据一致性。

等到真正把 segment 刷到磁盘,且 commit 文件进行更新的时候, translog 文件才清空。这一步,叫做 flush。同样,Elasticsearch 也提供了 /_flush 接口。

对于 flush 操作,Elasticsearch 默认设置为:每 30 分钟主动进行一次 flush,或者当 translog 文件大小大于 512MB (老版本是 200MB)时,主动进行一次 flush。这两个行为,可以分别通过 index.translog.flush_threshold_periodindex.translog.flush_threshold_size 参数修改。

对 Lucene 的更改只在 Lucene 提交期间才会持久化到磁盘,这是一个相对繁重的操作,因此不能在每个索引或删除操作之后执行。在一次提交之后和另一次提交之前发生的更改将在流程退出或 HW 失败的情况下丢失。为了防止此数据丢失,每个分片都有一个事务日志或与之关联的提前写日志。
在内部 Lucene 索引处理之后,任何索引或删除操作都将写入 translog。在发生崩溃时,当碎片恢复时,可以从事务日志重新播放最近的事务。

Elasticsearch flush是执行 Lucene 提交并启动新 translog 的过程。它是在后台自动完成的,为确保事务日志不会变得太大,避免使重放其操作在恢复过程中占用相当长的时间。

2)如何避免tanslog丢失

事务日志中的数据只有在 translog 被同步和提交时才会持久化到磁盘。如果发生硬件故障,自上次提交 translog 以来写入的任何数据都将丢失。
默认情况下,如果 index.translog.durability 被设置为 async,或者如果在每个索引、删除、更新或批量请求的末尾被设置为 request (默认值) ,Elasticsearch 将每5秒提交一次 translog。事实上,Elasticsearch 只会在事务日志成功融合并在主服务器和每个分配的副本上提交之后,才向客户机报告索引、删除、更新或批量请求的成功。

核心参数:

  • index.translog.sync_interval 同步频率
    不管写操作是什么,translog 多长时间被同步到磁盘并提交一次。默认为5 s。小于100ms 的值是不允许的。
  • index.translog.durability 持久化方式
    是否在每个索引、删除、更新或大容量请求之后进行 fsync 和提交 translog。有两种方式:
    request 默认值,在每次请求后提交 fsync。如果硬件出现故障,所有已确认的写操作都已提交到磁盘。能尽可能避免数据丢失。
    async 每次index.translog.sync_interval 都在后台提交 fsync 和 commit。在发生硬件故障时,从最后一次自动提交以来所有已确认的写都将被丢弃。

六、segment合并机制

通过上面的内容,我们知道了数据怎么进入 ES 并且如何才能让数据更快的被检索使用。其中用一句话概括了 Lucene 的设计思路就是”开新文件”。从另一个方面看,开新文件也会给服务器带来负载压力。因为默认每 1 秒,都会有一个新文件产生,每个文件都需要有文件句柄,内存,CPU 使用等各种资源。一天有 86400 秒,设想一下,每次请求要扫描一遍 86400 个文件,这个响应性能绝对好不了!

为了解决这个问题,ES 会不断在后台运行任务,主动将这些零散的 segment 做数据归并,尽量让索引内只保有少量的,每个都比较大的,segment 文件。这个过程是有独立的线程来进行的,并不影响新 segment 的产生。归并过程中,索引状态如图 2-7,尚未完成的较大的 segment 是被排除在检索可见范围之外的:

当归并完成,较大的这个 segment 刷到磁盘后,commit 文件做出相应变更,删除之前几个小 segment,改成新的大 segment。等检索请求都从小 segment 转到大 segment 上以后,删除没用的小 segment。这时候,索引里 segment 数量就下降了,状态如图 2-8 所示:

归并策略:
归并线程是按照一定的运行策略来挑选 segment 进行归并的。主要有以下几条:

  • index.merge.policy.floor_segment 默认 2MB,小于这个大小的 segment,优先被归并。
  • index.merge.policy.max_merge_at_once 默认一次最多归并 10 个 segment
  • index.merge.policy.max_merge_at_once_explicit 默认 forcemerge 时一次最多归并 30 个 segment。
  • index.merge.policy.max_merged_segment 默认 5 GB,大于这个大小的 segment,不用参与归并。forcemerge 除外。
    根据这段策略,其实我们也可以从另一个角度考虑如何减少 segment 归并的消耗以及提高响应的办法:加大 flush 间隔,尽量让每次新生成的 segment 本身大小就比较大。

七、shard分片机制

Elasticsearch 为了完成分布式系统,对一些名词概念作了变动。索引成为了整个集群级别的命名,而在单个主机上的Lucene 索引,则被命名为分片(shard)。
借助于分片机制,ES中的一个索引可以被拆分成多个分片分布在集群中的多台服务器上,减轻了单台服务的读写压力,增加了单个索引的容量上限,实现了大规模数据的存储和检索。

索引路由到分片的规则

Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?当我们创建文档时,它如何决定这个文档应当被存储在分片 1 还是分片 2 中呢?
首先这肯定不会是随机的,否则将来要获取文档的时候我们就不知道从何处寻找了。实际上,这个过程是根据下面这个公式决定的:

shard = hash(routing) % number_of_primary_shards

routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。 routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到余数 。这个分布在 0 到 number_of_primary_shards-1 之间的余数,就是我们所寻求的文档所在分片的位置。

这就解释了为什么我们要在创建索引的时候就确定好主分片的数量并且永远不会改变这个数量:因为如果数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。

主分片和副本

在空的单节点集群中上创建一个叫做 blogs 的索引,设置3个主分片和一组从分片(每个主分片有一个从分片对应),代码如下

PUT /blogs{   "settings" : {      "number_of_shards" : 3,      "number_of_replicas" : 1   }}


主分片(primary shards) 启动并且运行了,这时集群已经可以成功的处理任意请求,但是 从分片(replica shards) 没有完全被激活。事实上,当前这三个从分片都处于 unassigned(未分配)的状态,它们还未被分配到节点上。在同一个节点上保存相同的数据副本是没有必要的,如果这个节点故障了,就等同于所有的数据副本也丢失了。

启动第二个节点,配置第二个节点与第一个节点的 cluster.name 相同(./config/elasticsearch.yml文件中的配置),它就能自动发现并加入到第一个节点的集群中,如下图:

cluster-health 的状态为 green,这意味着所有的6个分片(三个主分片和三个从分片)都已激活,文档在主节点和从节点上都能被检索。

随着应用需求的增长,启动第三个节点进行横向扩展,集群内会自动重组,如图:

在 Node 1 和 Node 2 中分别会有一个分片被移动到 Node 3 上,这样一来,每个节点上就都只有两个分片了。这意味着每个节点的硬件资源(CPU、RAM、I/O)被更少的分片共享,所以每个分片就会有更好的性能表现。

接下来,我们来增加一下从分片组的数量:

PUT /blogs/_settings{   "number_of_replicas" : 2}

现在 blogs 的索引总共有9个分片:3个主分片和6个从分片, 又会变成一个节点一个分片的状态了,最终得到了三倍搜索性能的三节点集群

请求路由

1、根据ID获取单个文件的请求:

以下是从主分片或者副本分片检索文档的步骤顺序:

  1. 客户端向 Node 1 发送获取请求。
  2. 节点使用文档的 _id 来确定文档属于分片 0 。分片 0 的副本分片存在于所有的三个节点上。
    在这种情况下,它将请求转发到 Node 2
  3. Node 2 将文档返回给 Node 1,然后将文档返回给客户端。
在处理读取请求时,协调结点在每次请求的时候都会通过轮询所有的副本分片来达到负载均衡。

2、新建、索引或者删除请求:

以下是在主副分片和任何副本分片上面成功新建,索引和删除文档所需要的步骤顺序:

  1. 客户端向 Node 1发送新建、索引或者删除请求。
  2. 节点使用文档的 _id确定文档属于分片 0 。请求会被转发到 Node 3,因为分片0 的主分片目前被分配在Node 3 上。
  3. Node 3在主分片上面执行请求。如果成功了,它将请求并行转发到Node 1和Node 2的副本分片上。一旦所有的副本分片都报告成功, Node 3将向协调节点报告成功,协调节点向客户端报告成功。

总结

本文主要是对Elasticsearch 分布式架构原理进行了相关介绍。