Distributed 引擎
Distributed 表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以 Distributed 表引擎需要和其他数据表引擎一起协同工作。
ClickHouse 并不像其他分布式系统那样,拥有高度自动化的分片功能。ClickHouse 提供了**本地表(Local Table)与分布式表(Distributed Table)**的概念
- 本地表:通常以 _local 为后缀进行命名。本地表是承接数据的载体,可以使用非 Distributed 的任意表引擎,一张本地表对应了一个数据分片。
- 分布式表:通常以 _all 为后缀进行命名。分布式表只能使用 Distributed 表引擎,它与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。
分布式写入流程
在向集群内的分片写入数据时,通常有两种思路
-
借助外部计算系统,事先将数据均匀分片,再借由计算系统直接将数据写入 ClickHouse 集群的各个本地表。
-
通过 Distributed 表引擎代理写入分片数据。
第一种方案通常拥有更好的写入性能,因为分片数据是被并行点对点写入的。但是这种方案的实现主要依赖于外部系统,而不在于 ClickHouse 自身,所以这里主要会介绍第二种思路。为了便于理解整个过程,这里会将分片写入、副本复制拆分成两个部分进行讲解。
数据写入分片
-
在第一个分片节点写入本地分片数据:首先在 CH5 节点,对分布式表 test_shard_2_all 执行
INSERT
,尝试写入 10、30、200 和 55 四行数据。执行之后分布式表主要会做两件事情:-
根据分片规则划分数据
-
将属于当前分片的数据直接写入本地表 test_shard_2_local。
-
-
第一个分片建立远端连接,准备发送远端分片数据:将归至远端分片的数据以分区为单位,分别写入 /test_shard_2_all存储目录下的临时 bin 文件,接着,会尝试与远端分片节点建立连接。
-
第一个分片向远端分片发送数据:此时,会有另一组监听任务负责监听 /test_shard_2_all 目录下的文件变化,这些任务负责将目录数据发送至远端分片,其中,每份目录将会由独立的线程负责发送,数据在传输之前会被压缩。
-
第二个分片接收数据并写入本地:CH6 分片节点确认建立与 CH5 的连接,在接收到来自 CH5 发送的数据后,将它们写入本地表。
-
由第一个分片确认完成写入:最后,还是由 CH5 分片确认所有的数据发送完毕。
可以看到,在整个过程中,Distributed 表负责所有分片的写入工作。本着谁执行谁负责的原则,在这个示例中,由 CH5 节点的分布式表负责切分数据,并向所有其他分片节点发送数据。
在由 Distributed 表负责向远端分片发送数据时,有异步写和同步写两种模式:如果是异步写,则在 Distributed 表写完本地分片之后,INSERT
查询就会返回成功写入的信息;如果是同步写,则在执行 INSERT
查询之后,会等待所有分片完成写入。
副本复制数据
如果在集群的配置中包含了副本,那么除了刚才的分片写入流程之外,还会触发副本数据的复制流程。数据在多个副本之间,有两种复制实现方式:
-
Distributed 表引擎:副本数据的写入流程与分片逻辑相同,所以 Distributed 会同时负责分片和副本的数据写入工作。但在这种实现方案下,它很有可能会成为写入的单点瓶颈,所以就有了接下来将要说明的第二种方案。
-
ReplicatedMergeTree 表引擎:如果使用 ReplicatedMergeTree 作为本地表的引擎,则在该分片内,多个副本之间的数据复制会交由 ReplicatedMergeTree 自己处理,不再由 Distributed 负责,从而为其减负。
分布式查询流程
与数据写入有所不同,在面向集群查询数据的时候,只能通过 Distributed 表引擎实现。当 Distributed 表接收到SELECT
查询的时候,它会依次查询每个分片的数据,再合并汇总返回,流程如下:
多副本的路由规则
在查询数据的时候,如果集群中的某一个分片有多个副本,此时 Distributed 引擎就会通过负载均衡算法从众多的副本中选取一个,负载均衡算法有以下四种。
在 ClickHouse 的服务节点中,拥有一个全局计数器errors_count
,当服务发生任何异常时,该计数累积加1。
- random(默认):random 算法会选择
errors_count
错误数量最少的副本,如果多个副本的errors_count
计数相同,则在它们之中随机选择一个。 - nearest_hostname:nearest_hostname 可以看作 random 算法的变种,首先它会选择
errors_count
错误数量最少的副本,如果多个 replica 的errors_count
计数相同,则选择集群配置中 host 名称与当前 host 最相似的一个。而相似的规则是以当前 host 名称为基准按字节逐位比较,找出不同字节数最少的一个。 - in_order:in_order 同样可以看作 random 算法的变种,首先它会选择
errors_count
错误数量最少的副本,如果多个副本的errors_count
计数相同,则按照集群配置中 replica 的定义顺序逐个选择。 - first_or_random:first_or_random 可以看作 in_order 算法的变种,首先它会选择
errors_count
错误数量最少的副本,如果多个副本的errors_count
计数相同,它首先会选择集群配置中第一个定义的副本,如果该副本不可用,则进一步随机选择一个其他的副本。
多分片查询的流程
分布式查询与分布式写入类似,同样本着谁执行谁负责的原则,它会由接收SELECT
查询的 Distributed 表,并负责串联起整个过程。首先它会将针对分布式表的 SQL 语句,按照分片数量将查询拆分成若干个针对本地表的子查询,然后向各个分片发起查询,最后再汇总各个分片的返回结果。
|
|
如下图
- 查询各个分片数据:One 和 Remote 步骤是并行执行的,它们分别负责了本地和远端分片的查询动作。
- 合并返回结果:多个分片数据均查询返回后,在执行节点将所有数据
union
合并
使用 Global 优化分布式子查询
如果现在有一项查询需求,例如要求找到同时拥有两个仓库的用户,应该如何实现?对于这类交集查询的需求,可以使用IN
子查询,此时你会面临两难的选择:IN
查询的子句应该使用本地表还是分布式表?(使用JOIN
面临的情形与IN
类似)。
使用本地表的问题(可能查询不到结果)
如果在IN
查询中使用本地表时,如下列语句
|
|
在实际执行时,分布式表在接收到查询后会将上述 SQL 替换成本地表的形式,再发送到每个分片进行执行,此时,每个分片上实际执行的是以下语句
|
|
那么此时查询的最终结果就有可能是错误的,因为在单个分片上只保存了部分的数据,这就导致该 SQL 语句可能没有匹配到任何数据,如下图
使用分布式表的问题(查询请求被放大 N^2 倍,N 为节点数量)
如果在 IN
查询中使用本地表时,如下列语句
|
|
对于此次查询,每个分片节点不仅需要查询本地表,还需要再次向其他的分片节点再次发起远端查询,如下图
因此可以得出结论,在 IN
查询子句使用分布式表的时候,虽然查询的结果得到了保证,但是查询请求会被放大 N 的平方倍,其中 N 等于集群内分片节点的数量,假如集群内有 10 个分片节点,则在一次查询的过程中,会最终导致 100 次的查询请求,这显然是不可接受的。
使用 GLOBAL 优化查询
为了解决查询放大的问题,我们可以使用 GLOBAL IN
或 GLOBAL JOIN
进行优化,下面就简单介绍一下 GLOBAL
的执行流程
|
|
如上图,主要有以下五个步骤
- 将IN子句单独提出,发起了一次分布式查询。
- 将分布式表转 local 本地表后,分别在本地和远端分片执行查询。
- 将IN子句查询的结果进行汇总,并放入一张临时的内存表进行保存。
- 将内存表发送到远端分片节点。
- 将分布式表转为本地表后,开始执行完整的 SQL 语句,
IN
子句直接使用临时内存表的数据。
在使用 GLOBAL
修饰符之后,ClickHouse 使用内存表临时保存了 IN
子句查询到的数据,并将其发送到远端分片节点,以此到达了数据共享的目的,从而避免了查询放大的问题。由于数据会在网络间分发,所以需要特别注意临时表的大小,IN
或者 JOIN
子句返回的数据不宜过大。如果表内存在重复数据,也可以事先在子句 SQL 中增加 DISTINCT
以实现去重。