ElasticSearch 分布式原理

分布式原理

分布式存储

路由

当索引一个文档的时候,Elasticsearch会通过哈希来决定将文档存储到哪一个主分片中,路由计算公式如下:

1
2
3
4
shard = hash(routing) % number_of_primary_shards

//routing:默认为文档id,也可以自定义。
//number_of_primary_shards:主分片的数量
  • 查询时指定routing:可以直接根据routing信息定位到某个分片查询,不需要查询所有的分配,经

    过协调节点排序。

  • 查询时不指定routing:因为不知道要查询的数据具体在哪个分片上,所以整个过程分为 2 个步骤

    1. 分发:请求到达协调节点后,协调节点将查询请求分发到每个分片上。
    2. 聚合:协调节点搜集到每个分片上查询结果,在将查询的结果进行排序,之后给用户返回结果。

从上面的这个公式我们也可以看到一个问题,路由的逻辑与当前主分片的数量强关联,也就是说如果分片数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。这也就是为什么我们要在创建索引的时候就确定好主分片的数量并且永远不会改变这个数量

分片数量固定是否意味着会使索引难以进行扩容?

答案是否定的,Elasticsearch还提供了其他的一些方案来让我们轻松的实现扩容,如:

  • 分片预分配:一个分片存在于单个节点,但一个节点可以持有多个分片。因此我们可以根据未来的数据的扩张状况来预先分配一定数量的分片到各个节点中。(注意⚠️:预先分配过多的分片会导致性能的下降以及影响搜索结果的相关度)
  • 新建索引:分片数不够时,可以考虑新建索引,搜索1个有着50个分片的索引与搜索50个每个都有1个分片的索引完全等价。

更多关于水平拓展的内容可以参考官方文档扩容设计

新增、索引和删除文档

我们可以发送请求到集群中的任一节点。 每个节点都有能力处理任意请求。 每个节点都知道集群中任一文档位置,所以可以直接将请求转发到需要的节点上。 在下面的例子中,将所有的请求发送到 Node 1 ,我们将其称为协调节点(coordinating node)

当发送请求的时候, 为了扩展负载,更好的做法是轮询集群中所有的节点。

新建、索引和删除请求都是写操作, 必须在主分片上面完成之后才能被复制到相关的副本分片。

流程如下:

  1. 客户端向 Node 1 发送新建、索引或者删除请求。
  2. 节点使用文档的 _id 确定文档属于分片 0 。请求会被转发到 Node 3,因为分片 0 的主分片目前被分配在 Node 3 上。
  3. Node 3 在主分片上面执行请求。如果成功了,它将请求并行转发到 Node 1Node 2 的副本分片上。一旦所有的副本分片都报告成功, Node 3 将向协调节点报告成功,协调节点向客户端报告成功。

取回文档

由于取回文档为读操作,我们可以从主分片或者从其它任意副本分片检索文档。

流程如下:

  1. 客户端向 Node 1 发送获取请求。
  2. 节点使用文档的 _id 来确定文档属于分片 0 。分片 0 的副本分片存在于所有的三个节点上。 在这种情况下,它将请求转发到 Node 2
  3. Node 2 将文档返回给 Node 1 ,然后将文档返回给客户端。

在处理读取请求时,协调结点在每次请求的时候都会通过轮询所有的副本分片来达到负载均衡。

并发控制

在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失:

  • 悲观并发控制:这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。
  • 乐观并发控制:Elasticsearch中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操作。 然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。 例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。

Elasticsearch是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集群中的其他节点。Elasticsearch也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许会乱序。所以Elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本。

在Elasticsearch中,其通过版本号机制来实现乐观并发控制。即每一个文档中都会有一个_version版本号字段,当文档被修改时版本号递增。 Elasticsearch使用_version来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

我们可以利用_version号来确保应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的 version 号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 例如我们想更新文档的内容,并指定版本号为1
PUT /website/blog/1?version=1 
{
  "title": "My first blog entry",
  "text":  "Starting to get the hang of this..."
}

// 当文档的版本号为1时,次请求成功,同时响应体告诉我们版本号递增到2
{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "1",
  "_version": 2
  "created":  false
}

// 此时我们再次尝试更新文档的内容,仍然指定版本号为1,由于版本号不符合,此时返回409 Conflict HTTP 响应码
{
   "error": {
      "root_cause": [
         {
            "type": "version_conflict_engine_exception",
            "reason": "[blog][1]: version conflict, current [2], provided [1]",
            "index": "website",
            "shard": "3"
         }
      ],
      "type": "version_conflict_engine_exception",
      "reason": "[blog][1]: version conflict, current [2], provided [1]",
      "index": "website",
      "shard": "3"
   },
   "status": 409
}

分布式搜索

搜索需要一种更加复杂的执行模型,因为我们不知道查询会命中哪些文档,这些文档有可能在集群的任何分片上。 一个搜索请求必须询问我们关注的索引的所有分片的某个副本来确定它们是否含有任何匹配的文档。

但是找到所有的匹配文档仅仅完成事情的一半。 在 search 接口返回一个 page 结果之前,多分片中的结果必须组合成单个排序列表。 为此,搜索被执行成一个两阶段过程,我们称之为query then fetch(查询后取回)。

查询阶段

在查询阶段时, 查询会广播到索引中每一个分片拷贝(主分片或者副本分片)。 每个分片在本地执行搜索并构建一个匹配文档的优先队列。

查询阶段包含以下三个步骤

  1. 客户端发送一个 search 请求到 Node 3 ,此时Node 3成为协调节点,由它来负责本次的查询。
  2. Node 3 将查询请求广播到索引的每个主分片或副本分片中。每个分片在本地执行查询并添加结果到大小为 from + size 的本地有序优先队列中。
  3. 每个分片返回各自优先队列中所有文档的ID和排序值给协调节点,也就是 Node 3 ,它合并这些值到自己的优先队列中来产生一个全局排序后的结果列表。至此查询过程结束。

一个索引可以由一个或几个主分片组成, 所以一个针对单个索引的搜索请求需要能够把来自多个分片的结果组合起来。 针对 multiple 或者 all 索引的搜索工作方式也是完全一致的——仅仅是包含了更多的分片而已。

取回阶段

在查询阶段中,我们标识了哪些文档满足搜索请求,而接下来我们就需要取回这些文档。

取回阶段由以下步骤构成

  1. 协调节点辨别出哪些文档需要被取回并向相关的分片提交多个 GET 请求。例如,如果我们的查询指定了 { "from": 90, "size": 10 } ,最初的90个结果会被丢弃,只有从第91个开始的10个结果需要被取回。
  2. 每个分片加载并丰富文档(如_source字段和高亮参数),接着返回文档给协调节点。
  3. 协调节点等待所有文档被取回,将结果返回给客户端。

集群内部原理

集群与节点

一个运行中的Elasticsearch实例称为一个节点,而集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据。

由于Elasticsearch采用了主从模式,所以当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等。 因为主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈。 任何节点都可以成为主节点。

作为用户,我们可以将请求发送到集群中的任何节点(这个处理请求的节点也叫做协调节点)。 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。

分片

在分布式系统中,单机无法存储规模巨大的数据,要依靠大规模集群处理和存储这些数据,一般通过增加机器数量来提高系统水平扩展能力。因此,需要将数据分成若干小块分配到各个机器上。然后通过某种路由策略找到某个数据块所在的位置。

分片(shard)是底层的基本读写单元,分片的目的是分割巨大索引,让读写可以并行操作,由多台机器共同完成。读写请求最终落到某个分片上,分片可以独立执行读写工作。Elasticsearch利用分片将数据分发到集群内各处。分片是数据的容器,文档保存在分片内,不会跨分片存储。分片又被分配到集群内的各个节点里。当集群规模扩大或缩小时,Elasticsearch会自动在各节点中迁移分片,使数据仍然均匀分布在集群里。

为了应对并发更新问题,Elasticsearch将分片分为两部分,即主分片(primary shard)副本分片(replica shard)。主数据作为权威数据,写过程中先写主分片,成功后再写副分片,恢复阶段以主分片为准。

一个副本分片只是一个主分片的拷贝。副本分片作为硬件故障时保护数据不丢失的冗余备份,并为搜索和返回文档等读操作提供服务。

那索引与分片之间又有什么关系呢?

一个Elasticsearch索引包含了很多个分片,每个分片又是一个Lucene的索引,它本身就是一个完整的搜索引擎,可以独立执行建立索引和搜索任务。Lucene索引又由很多分段组成,每个分段都是一个倒排索引。Elasticsearch每次refresh都会生成一个新的分段,其中包含若干文档的数据。在每个分段内部,文档的不同字段被单独建立索引。每个字段的值由若干词(Term)组成,Term是原文本内容经过分词器处理和语言处理后的最终结果。

选举

在主节点选举算法的选择上,基本原则是不重复造轮子。最好实现一个众所周知的算法,这样的好处是其中的优点和缺陷是已知的。Elasticsearch的选举算法的选择上主要考虑下面两种。

  • Bully算法:Leader选举的基本算法之一。它假定所有节点都有一个唯一的ID,使用该ID对节点进行排序。任何时候的当前Leader都是参与集群的最高ID节点。该算法的优点是易于实现。但是,当拥有最大ID的节点处于不稳定状态的场景下会有问题。例如,Master负载过重而假死,集群拥有第二大ID的节点被选为新主,这时原来的Master恢复,再次被选为新主,然后又假死……
  • Paxos算法:Paxos非常强大,尤其在什么时机,以及如何进行选举方面的灵活性比简单的Bully算法有很大的优势,因为在现实生活中,存在比网络连接异常更多的故障模式。但Paxos实现起来非常复杂。

Elasticsearch的选主算法是基于Bully算法的改进,主要思路是对节点ID排序,取ID值最大的节点作为Master,每个节点都运行这个流程。同时,为了解决Bully算法的缺陷,其通过推迟选举,直到当前的Master失效来解决上述问题,只要当前主节点不挂掉,就不重新选主。但是容易产生脑裂(双主),为此,再通过法定得票人数过半解决脑裂问题。

Elasticsearch对Bully附加的三个约定条件

  1. 参选人数需要过半。当达到多数时就选出临时主节点,为什么是临时的?每个节点运行排序取最大值的算法,结果不一定相同。举个例子,集群有5台主机,节点ID分别是1、2、3、4、5。当产生网络分区或节点启动速度差异较大时,节点1看到的节点列表是1、2、3、4,选出4;节点2看到的节点列表是2、3、4、5,选出5。结果就不一致了,由此产生下面的第二条限制。
  2. 得票数需要过半。某节点被选为主节点,必须判断加入它的节点数达到半数以上,才确认Master身份(推迟选举)。
  3. 当探测到节点离开事件时,必须判断当前节点数是否过半。如果达不到半数以上,则放弃Master身份,重新加入集群。如果不这么做,则设想以下情况:假设5台机器组成的集群产生网络分区,2台一组,3台一组,产生分区前,Master位于2台中的一个,此时3台一组的节点会重新并成功选取Master,产生双主,俗称脑裂。(节点失效检测)

流程如下图

节点失效检测会监控节点是否离线,然后处理其中的异常。失效检测是选主流程之后不可或缺的步骤,不执行失效检测可能会产生脑裂(双主或多主)。在此我们需要启动两种失效探测器:

  • 在Master节点,启动NodesFaultDetection,简称NodesFD。定期探测加入集群的节点是否活跃。
  • 非Master节点启动MasterFaultDetection,简称MasterFD。定期探测Master节点是否活跃。

分片内部原理

索引不变性

早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦新的索引就绪,旧的就会被其替换,这样最近的变化便可以被检索到。

倒排索引被写入磁盘后是不可改变的,索引的不变性具有以下好处:

  • 不需要锁。如果你从来不更新索引,你就不需要担心多进程同时修改数据的问题。
  • 一旦索引被读入内核的文件系统缓存,便会留在哪里。由于其不变性,只要文件系统缓存中还有足够的空间,那么大部分读请求会直接请求内存,而不会命中磁盘。这提供了很大的性能提升。
  • 缓存(像过滤器缓存)在索引的生命周期内始终有效。它们不需要在每次数据改变时被重建,因为数据不会变化。
  • 写入单个大的倒排索引允许数据被压缩,减少磁盘 I/O 和 需要被缓存到内存的索引的使用量。

当然,一个不变的索引也有不好的地方。最大的缺点就是它是不可变的,我们无法对其进行修改。如果我们需要让一个新的文档可被搜索,就需要重建整个索引。这不仅对一个索引所能包含的数据量造成了巨大的限制,而且对索引可被更新的频率同样造成了影响。

动态更新索引

那么我们如何能在保留不变性的前提下实现倒排索引的动态更新呢?

答案就是使用更多的索引即新增内容并写到一个新的倒排索引中,查询时,每个倒排索引都被轮流查询,查询完再对结果进行合并。

Elasticsearch基于Lucene引入了按段写入的概念——每次内存缓冲的数据被写入文件时,会产生一个新的Lucene段,每个段都是一个倒排索引。同时,在提交点中描述了当前Lucene索引都含有哪些分段。

按段写入的流程如下:

  1. 新文档被收集到内存的索引中缓存
  2. 当缓存堆积到一定规模时,就会进行提交
    • 一个新的段(倒排索引)被写入磁盘。
    • 一个新的提交点被写入磁盘。
    • 所有在文件系统缓存中等待的写入都刷新到磁盘,以确保它们被写入物理文件。
  3. 新的段被开启,让它包含的文档可见以被搜索
  4. 内存缓存被清空,等待接收新的文档

当一个查询被触发,所有已知的段按顺序被查询。词项统计会对所有段的结果进行聚合,以保证每个词和每个文档的关联都被准确计算。 这种方式可以用相对较低的成本将新文档添加到索引。

那插入和更新又如何实现呢?

段是不可改变的,所以既不能从把文档从旧的段中移除,也不能修改旧的段来进行反映文档的更新。 取而代之的是,每个提交点会包含一个 .del 文件,文件中会列出这些被删除文档的段信息。

  • 当一个文档被删除时,它实际上只是在 .del 文件中被标记删除。一个被标记删除的文档仍然可以被查询匹配到, 但它会在最终结果被返回前从结果集中移除。

  • 当一个文档被更新时,旧版本文档被标记删除,文档的新版本被索引到一个新的段中。 可能两个版本的文档都会被一个查询匹配到,但被删除的那个旧版本文档在结果集返回前就已经被移除。

近实时搜索

Elasticsearch和磁盘之间是文件系统缓存,在执行写操作时,为了降低从索引到可被搜索的延迟,一般新段会被先写入到文件系统缓存,再将这些数据写入硬盘(磁盘I/O是性能瓶颈)。

在写操作中,一般会先在内存中缓冲一段数据,再将这些数据写入硬盘,每次写入硬盘的这批数据称为一个分段。如同任何写操作一样,通过操作系统的write接口写到磁盘的数据会先到达系统缓存(内存)。write函数返回成功时,数据未必被刷到磁盘。通过手工调用flush,或者操作系统通过一定策略将文件系统缓存刷到磁盘。

这种策略大幅提升了写入效率。从write函数返回成功开始,无论数据有没有被刷到磁盘,只要文件已经在缓存中, 就可以像其它文件一样被打开和读取了。

Lucene允许新段被写入和打开——使其包含的文档在未进行一次完整提交时便对搜索可见。 这种方式比进行一次提交代价要小得多,并且在不影响性能的前提下可以被频繁地执行。

Elasticsearch中将写入和打开一个新段的过程叫做refresh(刷新) 。 默认情况下每个分片会每秒自动刷新一次。这就是为什么我们说Elasticsearch是近实时搜索——文档的变化并不是立即对搜索可见,但会在一秒之内变为可见。

事务日志

由于系统先缓冲一段数据才写,且新段不会立即刷入磁盘,这两个过程中如果出现某些意外情况(如主机断电),则会存在丢失数据的风险。

为了解决这个问题,Elasticsearch增加了一个translog(事务日志),在每一次对Elasticsearch进行操作时均进行了日志记录,当Elasticsearch启动的时候,重放translog中所有在最后一次提交后发生的变更操作。

其执行流程如下:

  1. 一个文档被索引之后,就会被添加到内存缓冲区,并且追加到了translog
    • 新的文档被添加到内存缓冲区并且被追加到了事务日志,如下图
  2. 分片会每秒自动执行一次刷新,这些内存缓冲区的文档被写入新的段中并打开以便搜索,同时清空内存缓冲区。
    • 刷新完成后, 缓存被清空但是事务日志不会,同时新段写入文件系统缓冲区
  3. 这个进程继续工作,更多的文档被添加到内存缓冲区和追加到translog
    • 事务日志不断积累文档
  4. 当translog足够大时,就会执行全量提交,对文件系统缓存执行flush,将其内容全部写入硬盘中,并清空事务日志。
    • 在刷新(flush)之后,段被全量提交,并且事务日志被清空

除此之外,translog还有下面这些功能

  • translog提供所有还没有被刷到磁盘的操作的一个持久化纪录。当Elasticsearch启动的时候, 它会从磁盘中使用最后一个提交点去恢复已知的段,并且会重放translog中所有在最后一次提交后发生的变更操作。
  • translog也被用来提供实时CRUD 。当你试着通过ID查询、更新、删除一个文档,在从相应的段中检索之前, 首先检查translog任何最近的变更。这意味着它总是能够实时地获取到文档的最新版本。

段合并

由于自动刷新流程每秒会创建一个新的段 ,这样会导致短时间内的段数量暴增。而段数目太多会带来较大的麻烦。 每一个段都会消耗文件句柄、内存和cpu运行周期。更重要的是,每个搜索请求都必须轮流检查每个段,所以段越多,搜索也就越慢

Elasticsearch通过在后台进行段合并来解决这个问题,其会选择大小相似的分段进行合并。在合并过程中,标记为删除(更新)的数据不会写入新分段,当合并过程结束,旧的分段数据被删除,标记删除的数据才从磁盘删除。

流程如下图

合并大的段需要消耗大量的I/O和CPU资源,如果任其发展会影响搜索性能。Elasticsearch在默认情况下会对合并流程进行资源限制,所以搜索仍然有足够的资源很好地执行。

整体写入流程如下图

Built with Hugo
主题 StackJimmy 设计