ES高可用架构涉及常用功能整理

  • 1. es的高可用系统架构和相关组件
  • 2. es的核心参数
    • 2.1 常规配置
    • 2.2 特殊优化配置
      • 2.2.1 数据分片按ip打散
      • 2.2.2 数据分片机架感知
      • 2.2.3 强制要求数据分片机架感知
      • 2.2.4 写入线程池优化
      • 2.2.5 分片balance优化
      • 2.2.6 限流控制器优化
  • 3. es常用命令
    • 3.1 常用基础命令
    • 3.2 常用运维命令
  • 4. 事务性
    • 4.1 数据写流程
      • 4.1.1 4种类型写操作
      • 4.1.2 正常写入流程
    • 4.2 数据持久化过程
      • 4.2.1 相关概念
        • 4.2.1.1 translog文件
        • 4.2.1.2 segment文件
        • 4.2.1.3 translog和segment的差异
        • 4.2.1.4 flush
        • 4.2.1.5 refresh
        • 4.2.1.6 fsync
      • 4.2.2 持久化流程
    • 4.3 数据读流程
  • 5. 日志复制流程
    • 5.1. segment文件解析
    • 5.2. es的日志复制流程
    • 5.3. es的日志清理机制
      • 5.3.1 segment合并
      • 5.3.2 translong清理
  • 6. 疑问和思考
    • 6.1 es的flush过程和linux的cache刷盘有什么区别和联系?
    • 6.2 配置按照ip打散
    • 6.3 配置机架感知的参考配置
  • 7. 参考文档

探讨es的系统架构以及以及整体常用的命令和系统分析,本文主要探讨高可用版本的es集群,并基于日常工作中的沉淀进行思考和整理。更多关于分布式系统的架构思考请参考文档关于常见分布式组件高可用设计原理的理解和思考


1. es的高可用系统架构和相关组件

es面对的使用场景是,大量数据的生产和消费,是面对大数据的消息中间件。这么巨大的业务体量,难以通过一台机器完成所有的数据写入、存储和请求,因此需要进行数据的分片,采用 分片模式 进行数据拆分,从而降低单台机器的压力,并能够提供大量的集群扩展能力。

按照 分片模式 的架构模式,在架构上需要拆分2种类型的角色

  • 具备全局视角的元数据保存组件或者角色
  • 实际的worker节点,具体负责业务的数据写入、存储和读写请求

在es的系统架构中,全局视角 并没有拆分出一个单独的组件进行完成,而是复用es进程,通过es节点内部进行选主选择出Master主节点,负责全局的元数据存储和数据视角,将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等,维护分片在节点间的分配关系

相关核心的组件和角色作用如下

组件部署模式组件作用备注
Master多节点部署存储集群的元数据,具体集群数据的全局视角主要职责是和集群操作相关的内容,例如创建或删除索引、跟踪哪些节点是集群的一部分,并决定哪些分片分配给相关的节点。通过Raft协议选主
Data多节点部署它负责接收、存储和管理数据接收和存储消息数据,接收来自客户端写入的数据,并将这些消息存储在自己的磁盘上。 数据节点也区分多种角色,比如热节点、冷节点等
Ingest多节点部署执行由预处理管道组成的预处理任务将数据进行清洗、集成、转换等预处理,从而提升数据的查询性能

注:客户端写入索引数据时,并不需要通过Master协调,而是ES接受到请求的节点协调,请求转发到数据Primary分片所在节点

2. es的核心参数

2.1 常规配置

elasticsearch.yml

path.data: /data1/1687270572000005409/es/data# # 尽可能按照标签打散,但是如果机器不满足标签要求,也不强制打散,可以配置多个,如rack,ipcluster.routing.allocation.awareness.attributes: ip# node的自定义属性,从而给机器打上标签,通过标签将索引、分片打散在不同的机器上node.attr.ip: xxnode.attr.rack: rack_1node.attr.temperature: hot# 默认发现的master节点列表,如果空集群会通过这个配置发现其他的master节点并通信,完成选主discovery.zen.ping.unicast.hosts: ["xx.xx.xx.xx:9300", "xx.xx.xx.xx:9300", "xx.xx.xx.xx:9300"]# 设置在选举 Master 节点时需要参与的最少的候选主节点数,默认为 1。# 如果使用默认值,则当网络不稳定时有可能会出现脑裂。合理的数值为(n/2)+1discovery.zen.minimum_master_nodes: 2# 限流器配置,默认30%,通常调整到90%indices.breaker.total.limit: 90%indices.memory.index_buffer_size: 15%indices.queries.cache.count: 500indices.breaker.fielddata.limit: 20%indices.breaker.request.limit: 6%indices.fielddata.cache.size: 15%indices.queries.cache.size: 5%# 集群角色配置node.master: truenode.ingest: truenode.data: truenode.name: 1687270572000005409# 感知当前节点的磁盘使用率水位cluster.routing.allocation.disk.watermark.low: 85%cluster.routing.allocation.disk.watermark.high: 90%cluster.routing.allocation.disk.watermark.flood_stage: 95%# 网络配置network.host: 0.0.0.0network.publish_host: xx.xx.xx.xxtransport.tcp.port: 9300http.port: 9200# 写入线程池的长度,提升写入性能thread_pool.write.queue_size: 10000thread_pool.write.queue_size: 10000thread_pool.search.queue_size: 500# 提升集群恢复分片的并发度和带宽cluster.routing.allocation.cluster_concurrent_rebalance: 50cluster.routing.allocation.node_concurrent_recoveries: 50cluster.routing.allocation.node_initial_primaries_recoveries: 50indices.recovery.max_bytes_per_sec: 400mb# 关闭系统调用过滤器bootstrap.system_call_filter: false# 通过集群名称标识、加入集群cluster.name: "xx"

2.2 特殊优化配置

默认的es配置有不少默认的参数,对于es的分片分配、集群的数据写入、集群容灾恢复性能的参数不足,因此难以发挥最大的效用

2.2.1 数据分片按ip打散

es的相同shard有多副本,但是默认情况下,es在分配分片时,并不强制要求按照ip打散。如下配置可以按照ip打散,能够容忍ip级别的高可用,但是这个配置是try best模式。
如果正常启动的ip数量满足打散要求,对应的副本会主动分配到不同的机器ip上进行打散
如果正常启动的ip数量不满足打散要求,对应的副本会主动分配到的相同机器ip上,尽可能的保证副本正常启动。

# node的自定义属性,从而给机器打上标签,通过标签将索引、分片打散在不同的机器上cluster.routing.allocation.awareness.attributes: ipnode.attr.ip: xx

也可以调用api生效

2.2.2 数据分片机架感知

如下配置可以按照机架、ip打散,能够容忍机架、ip级别的高可用,但是这个配置是try best模式。
如果rack1、rack2正常启动,对应的副本会主动分配到rack1、rack2上进行打散
如果rack2的机器没有启动,对应的副本会分配到rack1上本正常启动。

# node的自定义属性,从而给机器打上标签,通过标签将索引、分片打散在不同的机器上cluster.routing.allocation.awareness.attributes: rack,ipnode.attr.ip: xxnode.attr.rack: rack1

2.2.3 强制要求数据分片机架感知

如下配置可以按照机架、ip打散,能够容忍机架、ip级别的高可用,但是这个配置是强制模式。
如果rack1、rack2正常启动,对应的副本会主动分配到rack1、rack2上进行打散
如果rack2的机器没有启动,对应的副本不会进行分配,而是出于unassign状态。

# node的自定义属性,从而给机器打上标签,通过标签将索引、分片打散在不同的机器上cluster.routing.allocation.awareness.attributes: racknode.attr.rack: rack1# 强制要求按照rack1、rack2分配打散cluster.routing.allocation.awareness.force.rack.values: rack1,rack2

注: 这里没有要求cluster.routing.allocation.awareness.attributes:rack,ip,原因是cluster.routing.allocation.awareness.attributes:rack已经能够保证1个rack只启动1个副本

2.2.4 写入线程池优化

# 写入线程池的长度,提升写入性能thread_pool.write.queue_size: 10000thread_pool.write.queue_size: 10000thread_pool.search.queue_size: 500

可以调用api热生效

curl -XPUT 'http://127.0.0.1:9200/_cluster/settings' -H 'Content-Type: application/json' -d '{"persistent": {"thread_pool.write.queue_size": 10000,"thread_pool.write.queue_size": 10000,"thread_pool.search.queue_size": 500}}'

2.2.5 分片balance优化

# 提升集群恢复分片的并发度和带宽,默认配置是2cluster.routing.allocation.cluster_concurrent_rebalance: 50cluster.routing.allocation.node_concurrent_recoveries: 50cluster.routing.allocation.node_initial_primaries_recoveries: 50indices.recovery.max_bytes_per_sec: 400mb

可以调用api热生效

curl -XPUT 'http://127.0.0.1:9200/_cluster/settings' -H 'Content-Type: application/json' -d '{"persistent": {"cluster.routing.allocation.cluster_concurrent_rebalance": "50","cluster.routing.allocation.node_concurrent_recoveries": "50","cluster.routing.allocation.node_initial_primaries_recoveries": "50","indices.recovery.max_bytes_per_sec": "100mb"}}'

2.2.6 限流控制器优化

indices.breaker.total.limit: 90%
curl -XPUT 'http://127.0.0.1:9200/_cluster/settings' -H 'Content-Type: application/json' -d '{"persistent": {"indices.breaker.total.limit": "90%"}}'

3. es常用命令

3.1 常用基础命令

参考文档 ES常用操作和经典case整理

3.2 常用运维命令

4. 事务性

4.1 数据写流程

4.1.1 4种类型写操作

操作说明
create创建文档
delete删除文档,ES对文档的删除是懒删除机制,即标记删除
index这里的index是动词,表示创建索引
update文档更新

这几种写操作都会触发es的索引数据写流程

4.1.2 正常写入流程

整体写流程如下

ES的写入有两种方式

  • 逐个文档写入(index)
  • 多个文档批量写入(bulk)

对于这两种写入方式,ES都会将其转换为bulk写入,写操作一般会经历三种节点:协调节点、主分片所在节点、从分片所在节点。

  1. 客户端发起请求,连接到的NODE1,NODE1可视为协调节点,协调节点接收到请求后,根据算法 shard_num=hash(_routing)%num_primary_shards (_routing 的默认值是文档的 id) 计算出属于分片0,跟根据元数据信息找到分片0的Primary节点NODE3
  2. 将请求转发到分片0的主分片所在的节点NODE3,NODE3完成写入后
  3. 将请求转发给分片0所属的从分片所在的节点NODE1和NODE2,等待所有副本(具体等待多少副本取决于wait_for_active_shards的配置值)完成写入并返回给NODE3 ACK后,NODE3则认为整个写入成功
  4. NODE3将结果反馈给协调节点NODE1,协调节点NODE1再将结果返回客户端

注意: 客户端收到协调节点NODE1的返回结果后,表示数据写入成功,但是并不代表客户端就可以马上查询到这些数据。因为ES返回客户端表示数据写入完成后,在ES还需要进行fresh动作,把ES的缓存事务刷新到操作系统Cache中,才能提供给客户端读取,这个过程跟写过程解耦,refresh的默认周期是1s,所以才称之为近实时数据库

4.2 数据持久化过程

4.2.1 相关概念

在介绍流程之前,有几个概念需要提前介绍(文件类和动作类),才能更好理解整个持久化过程。

4.2.1.1 translog文件

根据官网的介绍,translog又叫事务日志或者传输日志。

所有索引和删除操作都会在es内部索引处理后但在被commit之前写入 translog。如果发生崩溃,当分片恢复时,已确认但尚未包含在上次 Lucene 提交中的最近操作将从 translog 中恢复。因为translog是append操作,触发磁盘的顺序写入,因此能够获取很高的磁盘io性能,所以通过translog能够实现低成本、高效的保证数据不丢失。

需要注意的是,translog的刷盘方式有两种:同步(request)和异步(async),由index.translog.durability参数决定。

  • index.translog.durability: 默认request
    request(默认):fsync并在每次请求后提交。如果发生硬件故障,所有确认的写入都已经提交到磁盘。
    async:fsync 并在后台提交每个 sync_interval`. 如果发生故障,自上次自动提交以来所有确认的写入都将被丢弃。

虽然linux本身也会针对page cache中的数据落盘,但是对于es来说,如果把flush的使命全部交给linux,明显不是一种可控的行为,因此es本身在异步(async)情况下,针对translog落盘,有es的相关方式

  • index.translog.flush_threshold_ops: 执行多少次操作后执行一次flush,生成新的commit,默认无限制
  • index.translog.flush_threshold_size: translog一旦达到最大大小,就会发生flush,生成新的commit (生成commit文件,并删除commit事务前的translog),默认为512mb
  • index.translog.flush_threshold_period: 每隔多长时间执行一次flush操作, 默认是30min:
  • index.translog.sync_interval: 默认5s
    将事务日志fsync写入磁盘并提交的频率,无论写入操作如何。默认为5s. 100ms不允许 小于小于的值

两种方式各有千秋,同步刷盘具备更强数据可靠性保障,但同时带来更高的IO开销,性能更低。异步刷盘牺牲了一定的可靠性保障,但是降低了IO的开销,性能更佳,因此需要根据不同的场景需求选择合适的方式。

注: 在异步(async)模式下,由于translog也只是把数据写入到linux的page cache,并最大5s触发一次刷盘。因此这种模式下,最多可能出现5s数据丢失(机器宕机后恢复)。

4.2.1.2 segment文件

segment是es数据索引分片的真实数据存储文件,记录真实的文档数据,并按照文档排序。每个片段代表了索引中的一个独立部分,并且可以独立地被存储、加载和搜索

4.2.1.3 translog和segment的差异

segment文件和translog文件最大的差异就是

  • translog只是通过记录更新的事务操作,并不会组织相关的数据结构,因此能够通过append,写入磁盘的性能高,写入的性能几乎跟内存相近!
  • segment文件需要针对数据进行排序和组织相关的数据结构,因此是一个复杂的数据文件,无法通过append写入磁盘,磁盘的读写io相对随机,因此完成一次segment写入要比translog的时间和成本要高很多,甚至可能会可能会达到几十倍!

这也是为什么已经有了segment,还需要费劲设计translog的原因!

4.2.1.4 flush

在ES中,当写入一个新文档时,进行数据commit时

  • 首先被写入到ES内存的索引缓存中,并写入translog,此时这个数据还不能被查询
  • 将ES内存中的索引缓存定期刷新到Linux的Page Cache,生成segment(此时只是在Linux的Page Cache中,并没有持久化到磁盘中),默认刷新周期是1s,此时数据能够被客户端查询
  • 完成refresh后,对应在ES内存中的索引缓存会被清空回收

es 默认每隔 30 分钟或者操作数据量达到 512mb ,会将内存 buffer 的数据全都写入新的 segment 中,内存 buffer 被清空,一个 commit point 被写入磁盘,并将 filesystem cache 中的数据通过 fsync 刷入磁盘,同时清空 translog 日志文件,这个过程叫做 flush;

相关参数

index.translog.flush_threshold_ops: 执行多少次操作后执行一次flush,生成新的commit,默认无限制index.translog.flush_threshold_size: translog一旦达到最大大小,就会发生flush, (生成commit文件,并删除commit事务前的translog),默认为512mb

此时有一些聪明的读者可能会意识到,数据没有落盘,如果机器宕机,那数据不就丢了吗? 是的。这条链路是可能会丢数据的,而且丢失的时间长达30min,这是不能接受的。为了弥补这个缺陷,translog的设计就尤为重要。

4.2.1.5 refresh

refresh是将es中的索引缓存周期性的写入到Liunx的Page Cache的过程,默认周期是1s

设计refresh,为了合并数据写入操作,统一refresh到磁盘生成segment文件能够极大的提升ES的数据写入能力。通过1s的近实时(数据可查询延迟了1s)作为代价换取强大的数据写入能力。事实上,ES会默认定期30min,主动触发commit,将Linux Page Cache中的数据刷新大磁盘中。

1s的相关配置是每个索引的默认配置,但是也可以更改,以索引作为修改单元

curl -XPUT -H "Content-Type: application/json" http://localhost:9200/$index/_settings -d '{"refresh_interval": "1s"}'

所以translog的重要度就体现出来了, 由于在写入数据时,分为2条链路

  • 1条是写入ES内存进行索引缓存,并刷新到Linux Page Cache,此时数据可读
  • 1条是写入到translog,如果ES异常,translog不会受到任何影响。如果Linux宕机重启,ES会重新读取加载translog进行数据回放,从而保证了数据不丢失(同步-request模式)或者更小成本的丢失(异步-async模式,最大丢失5s)
4.2.1.6 fsync

fsync 是一个 Unix 系统调用函数, 用来将内存 buffer 中的数据存储到文件系统. 这里作了优化, 是指将 filesystem cache 中的所有 segment 刷新到磁盘的操作;

4.2.2 持久化流程

es是近实时搜索机制,原因就是持久化过程有相关的refresh和flush过程。整体流程如下。

  1. 数据首先写入内存缓存区和translog日志文件中。当你写一条数据doc的时候,一方面写入到内存缓冲区中,一方面同时写入到Translog日志文件中。
  2. 内存缓存区满了或者每隔1秒(默认1秒),refresh将内存缓存区的数据生成index segment文件并写入文件系统缓存区,此时index segment可被打开以供search查询读取,这样文档就可以被搜索到了(注意,此时文档还没有写到磁盘上);然后清空内存缓存区供后续使用。可见,refresh实现的是文档从内存缓存区移到文件系统缓存区的过程。
  3. 重复上两个步骤,新的segment不断添加到文件系统缓存区,内存缓存区不断被清空,而translog的数据不断增加,随着时间的推移,translog文件会越来越大。
  4. 当translog长度达到一定程度的时候,会触发flush操作,否则默认每隔30分钟也会定时flush,其主要过程:
  • 执行refresh操作将内存缓存区中的数据写入到新的segment并写入文件系统缓存区,然后打开本segment以供search使用,最后再次清空内存缓存区。
  • 一个commit point被写入磁盘,这个commit point中标明所有的index segment。
  • 文件系统中缓存的所有的index segment文件被fsync强制刷到磁盘,当index segment被fsync强制刷到磁盘上以后,就会被打开,供查询使用。
  • translog被清空和删除,创建一个新的translog。

4.3 数据读流程

es的读取流程相对写流程要简单很多。

  1. 查询可发送到任意节点,接收到某查询的节点会作为该查询的协调节点。
  2. 协调节点解析查询,向对应数据分片分发查询子任务。
  3. 各数据分片检索本地数据并返回协调节点,经汇聚处理后返回用户。

5. 日志复制流程

5.1. segment文件解析


Segment 直接提供了搜索功能的,ES 的一个 Shard (Lucene Index)中是由大量的 Segment 文件组成的,且每一次 fresh 都会产生一个新的 Segment 文件,这样一来 Segment 文件有大有小,相当碎片化。有一个文件,用来记录所有 Segments 信息,叫做Commit Point。ES 内部则会开启一个线程将小的 Segment 合并(Merge)成大的 Segment,减少碎片化,降低文件打开数,提升 I/O 性能。

segment是不能更改的,那么如何删除或者更新文档?

  • .del文件,文件内记录了在某个segment内某个文档已经被删除。在segment中,被删除的文档依旧是能够被搜索到的,不过在返回搜索结果前,会根据.del把那些已经删除的文档从搜索结果中过滤掉。

  • 当一个文档发生更新时,首先会在.del中声明这个文档已经被删除,同时新的文档会被存放到一个新的segment中。这样在搜索时,虽然新的文档和老的文档都会被匹配到,但是.del会把老的文档过滤掉,返回的结果中只包含更新后的文档。

5.2. es的日志复制流程

  1. 客户端发起请求,连接到的NODE1,NODE1可视为协调节点,协调节点接收到请求后,根据算法 shard_num=hash(_routing)%num_primary_shards (_routing 的默认值是文档的 id) 计算出属于分片0,跟根据元数据信息找到分片0的Primary节点NODE3
  2. 将请求转发到分片0的主分片所在的节点NODE3,NODE3完成写入后
  3. 将请求转发给分片0所属的从分片所在的节点NODE1和NODE2,等待所有副本(具体等待多少副本取决于wait_for_active_shards的配置值)完成写入并返回给NODE3 ACK后,NODE3则认为整个写入成功
  4. NODE3将结果反馈给协调节点NODE1,协调节点NODE1再将结果返回客户端

5.3. es的日志清理机制

5.3.1 segment合并

为什么要进行段合并?

  • 索引段的个数越多,搜索性能越低并且消耗更多的内存。
  • 索引段是不可变的,你并不能物理上从中删除信息。
  • 可以物理上删除document,但只是做了删除标记,物理上并没有删除。
  • 当段合并时,这些被标记为删除的文档并没有被拷贝至新的索引段中,这样,减少了最终的索引段中的 document 数目。

在 ES 后台会有一个线程进行 segment 合并

  • 合并进程选择一小部分大小相似的 segment,并且在后台将它们合并到更大的 segment 中,此合并进程,并不会中断索引和搜索。
  • 新的 segment 被刷新(flush)到了磁盘,同时生成一个新的commit point(记录了新的segment和要被排除的segment是哪些)
  • 老的 segment 被删除

合并segment的相关参数,可以参考 Elasticsearch 性能调优:段合并(Segment merge)

5.3.2 translong清理

很显然,我们不可能让translog无限制的增长下去。事实上,当segment数据完成落盘后,translog已经完成了他的职责,就可以清理了。因此触发清理translog的操作有

  • flush操作,将segment落盘,生成commit,并清理translog,新增的事务写入到新的translog中

导致flush操作的条件有

  • es 默认每隔 30 分钟
  • 操作数据量达到 512mb

相关参数如有

  • index.translog.flush_threshold_ops: 执行多少次操作后执行一次flush,生成新的commit,默认无限制
  • index.translog.flush_threshold_size: translog一旦达到最大大小,就会发生flush,生成新的commit (生成commit文件,并删除commit事务前的translog),默认为512mb
  • index.translog.flush_threshold_period: 每隔多长时间执行一次flush操作, 默认是30min:

6. 疑问和思考

6.1 es的flush过程和linux的cache刷盘有什么区别和联系?

  • linux本身的page cache也能够触发数据刷盘动作,具体可以参考 Linux 内核源码分析-Page Cache 刷脏源码分析 但是这是linux/unix本身的行为,很有可能无法达成es的预期目标。
  • 因此es自己设置了flush的机制,确保能够按照预设的目标进行操作

6.2 配置按照ip打散

# node的自定义属性,从而给机器打上标签,通过标签将索引、分片打散在不同的机器上cluster.routing.allocation.awareness.attributes: ipnode.attr.ip: xx

6.3 配置机架感知的参考配置

# node的自定义属性,从而给机器打上标签,通过标签将索引、分片打散在不同的机器上cluster.routing.allocation.awareness.attributes: racknode.attr.rack: rack1# 强制要求按照rack1、rack2分配打散cluster.routing.allocation.awareness.force.rack.values: rack1,rack2

注: 这里没有要求cluster.routing.allocation.awareness.attributes:rack,ip,原因是cluster.routing.allocation.awareness.attributes:rack已经能够保证1个rack只启动1个副本

7. 参考文档

  • Elasticsearch深入:数据持久化过程
  • ES写入过程和写入原理调优及如何保证数据的写一致性