流处理
在批处理中,输入数据是有界的(已知和有限的大小),所以批处理系统能直到它何时完成输入的读取。但事实上,很多场景下数据都是无界的,数据会随着时间的推移不断到来,并且这个过程永远不会结束,所以我们没有办法能掌握数据的大小以及它该何时结束。
为了解决这个问题,批处理程序以时间来划分数据块,但这也就导致了一个问题:划分的时间越长,则延迟越长。倘若我们的服务一天更新一次数据,这会使得用户的体验直线下降,为了减少延迟,我们就需要将这个时间缩短,更加频繁的处理数据,这也就是流处理的原理。
发送事件流
在批处理中任务的输入和输出都是文件,而在流处理领域中,输入则变成了一系列的事件。
事件:一个小的、自包含的、不可变的对象,包含某个时间点发生的某件事情的细节。一个事件通常包含一个来自日历时钟的时间戳,以指明事件发生的时间。
在批处理中,文件只被写入一次,然后可能被多个作业读取。类似地,在流处理领域中,一个事件由生产者(producer)生成一次,然后可能由多个消费者(consumer)进行处理。在文件系统中,文件名标识一组相关记录;在流式系统中,相关的事件通常被聚合为一个主题(topic) 。
消息传递系统
向消费者通知新事件的常用方式是使用消息传递系统(messaging system):生产者发送包含事件的消息,然后将消息推送给消费者。
不同的消息传递系统可能会采取不同的实现方案,我们可以借助下面两个问题来区分这些系统:
如果生产者发送消息的速度比消费者能够处理的速度快会发生什么?
解决方案主要有三种:
- 丢弃这些来不及处理的消息。
- 将消息放入缓冲队列,延迟处理。
- 采取背压(backpressure)机制,阻塞生产者,以免其发送更多的消息。
如果节点崩溃或暂时脱机,会发生什么情况?是否会有消息丢失?
如果要想保证持久性,就必须定期写入磁盘和复制数据。而如果允许一定程序的消息丢失,则可以获得更高的吞吐量和更低的延迟。
直接消息传递系统
许多消息传递系统使用生产者和消费者之间的直接网络通信,而不通过中间节点:
- UDP 组播广泛应用于金融行业,因为这些常见需要保证低延时(虽然 UDP 本身是不可靠的,但应用层的协议可以恢复丢失的数据包)。
- 无代理的消息库,如 ZeroMQ 和 nanomsg 采取类似的方法,通过 TCP 或 IP 多播实现发布 / 订阅消息传递。
- StatsD 和 Brubeck 使用不可靠的 UDP 消息传递来收集网络中所有机器的指标并对其进行监控(在 StatsD 协议中,只有接收到所有消息,才认为计数器指标是正确的;使用 UDP 将使得指标处在一种最佳近似状态)。
- 如果消费者在网络上公开了服务,生产者可以直接发送 HTTP 或 RPC 请求消息推送给使用者。
虽然这些系统的性能很好,但是它们的容错程度极为有限。即使协议检测到并重传在网络中丢失的数据包,它们通常也只是假设生产者和消费者始终在线。
如果消费者处于脱机状态,则可能会丢失其不可达时发送的消息。一些协议允许生产者重试失败的消息传递,但当生产者崩溃时,它可能会丢失消息缓冲区及其本应发送的消息,这种方法可能就没用了。
消息代理
目前的主流方案则是使用消息代理(也称为消息队列)来发送消息。消息代理实质上是一种针对处理消息流而优化的数据库。它作为服务器运行,生产者和消费者作为客户端连接到服务器。生产者将消息写入代理,消费者通过从代理那里读取来接收消息。
通过将数据集中在代理上,这些系统可以更容易地容忍来来去去的客户端(连接,断开连接和崩溃),而持久性问题则转移到代理的身上。一些消息代理只将消息保存在内存中,而另一些消息代理(取决于配置)将其写入磁盘,以便在代理崩溃的情况下不会丢失。
消息代理 VS 数据库
- 数据库通常保留数据直至显式删除,而大多数消息代理在消息成功递送给消费者时会自动删除消息。这样的消息代理不适合长期的数据存储。
- 数据库通常支持次级索引和各种搜索数据的方式,而消息代理通常支持按照某种模式匹配主题,订阅其子集。
- 由于它们很快就能删除消息,因此大多数消息代理的队列很短。如果代理需要缓冲很多消息,比如因为消费者速度较慢(如果内存装不下消息,可能会溢出到磁盘),每个消息需要更长的处理时间,整体吞吐量可能会恶化。
- 查询数据库时,结果通常基于某个时间点的数据快照;如果另一个客户端随后向数据库写入一些改变了查询结果的内容,则第一个客户端不会发现其先前结果现已过期(除非它重复查询或轮询变更)。相比之下,消息代理不支持任意查询,但是当数据发生变化时(即新消息可用时),它们会通知客户端。
多个消费者
当多个消费者从同一主题中读取消息时,有两种主要的消息传递模式:
- 负载均衡(load balancing):每条消息都被传递给消费者之一,所以处理该主题下消息的工作能被多个消费者共享。代理可以为消费者任意分配消息。
- 扇出(fan-out):每条消息都被传递给所有消费者。扇出允许几个独立的消费者各自监听相同的消息广播,而不会相互影响。
两种模式可以组合使用:例如,两个独立的消费者组可以每组各订阅同一个主题,每一组都共同收到所有消息,但在每一组内部,每条消息仅由单个节点处理。
确认与重传
由于消费者随时可能会崩溃,所以可能会存在这么一个场景:代理向消费者递送消息,但消费者没有处理,或者在消费者崩溃之前只进行了部分处理。为了确保消息不会丢失,消息代理引入了**确认(acknowledgments)**机制,即客户端必须显式告知代理消息处理完毕的时间,以便代理能将消息从队列中移除。
如果与客户端的连接关闭,或者代理超出一段时间未收到确认,代理则认为消息没有被处理,因此它将消息再递送给另一个消费者。
当结合上文提到的负载均衡时,这种重传行为会对消息的顺序产生影响。如下图,当某个消费者在处理消息时崩溃了,此时这个未处理的消息就会被重传到其他消费者手中,这也就可能导致消息的交付顺序(那个消费者可能正在处理别的消息)与生产者的发送顺序不一致。
即使消息代理试图保留消息的顺序,负载均衡与重传的组合也不可避免地导致消息被重新排序。为了避免此问题,可以让每个消费者使用单独的队列(即不使用负载均衡功能)。
分区日志
有没有一种方法既有数据库的持久存储,又能保证消息传递的低延迟?这时就需要提到基于日志的消息代理。
使用日志进行消息存储
基于日志的消息代理的实现原理如下:生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。
为了伸缩超出单个磁盘所能提供的更高吞吐量,可以对日志进行分区。不同的分区可以托管在不同的机器上,使得每个分区都有一份能独立于其他分区进行读写的日志。一个主题可以定义为一组携带相同类型消息的分区。如下图所示:
在每个分区内,代理为每个消息分配一个单调递增的序列号或偏移量(因为是追加写入,保证了单分区的完全有序,但无法保证不同分区有序)。
日志 VS 消息传递
在消息处理代价高昂,希望逐条并行处理,以及消息的顺序并没有那么重要的情况下,基于队列的消息代理是可取的。另一方面,在消息吞吐量很高,处理迅速,顺序很重要的情况下,基于日志的方法表现得非常好。
消费者偏移量
由于日志是追加写入的,因此仅仅需要通过偏移量就可以判断消息是否已被处理:**所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到。**因此,代理不需要跟踪确认每条消息,只需要定期记录消费者的偏移即可。
磁盘空间使用
为了避免数据不断写入而导致磁盘空间耗尽,通常日志会被分为多个段,并不定时将旧段删除或归档存储。为了避免生产者写入过快,导致消费者读取到被删除的数据,从而丢失数据,日志通常会实现一个循环缓冲区,当缓冲区填满后再丢弃数据。
重播旧消息
在基于队列的消息代理中,处理和确认消息后会导致该消息被删除。而在基于日志的代理中,则仅仅是读取日志,并不会做任何的修改操作。这也就使得我们能够重放数据,即从之前的偏移量开始重新读取。
数据库与流
保持系统同步
通常情况下,没有一个系统能够满足所有的存储需求,这就要求当一个数据发生变动时,这个变更应该同步到所有的相关系统中。
对于数据仓库而言,通常的作法是转储数据库——取得数据库的完整副本,然后执行 ETL 将数据加载到数据仓库中。但是由于这样做的效率过低,有时人们又会采取双写来进行替代——代码在写入数据库的同时写入到每个系统中。
但是双写在改善效率的同时,又带来了一系列的问题:
- 数据竞争:在并发写入的时候,一台机器可能会覆盖掉另一个机器的写入结果,并且双方都无法感知这个过程。
- 原子提交:双写需要保证写入操作是原子的,否则一个写入成功、一个写入失败时就会导致数据的不一致。
变更数据捕获
变更数据捕获(change data capture, CDC) 指的是记录是写入数据库的所有数据变更,将其提取并转换为可以复制到其他系统中的形式的过程。
如下图,我们可以捕获数据中的变更,并将变更日志以相同的顺序应用于其他系统中,则能够保证多个系统中的数据与数据库一致。
通常的实现方案有如下两种:
- 数据库触发器:注册观察所有变更的触发器,并将相应的变更项写入变更日志表中。
- 解析复制日志:解析数据库的日志,将解析出来的修改传递给下游。
事件溯源
事件溯源是一种强大的数据建模技术:从应用的角度来看,将用户的行为记录为不可变的事件更有意义,而不是在可变数据库中记录这些行为的影响。事件溯源使得应用随时间演化更为容易,通过更容易理解事情发生的原因来帮助调试的进行,并有利于防止应用 Bug。
事件溯源和变更数据捕获都将所有对应用状态的变更存储为变更事件日志,那么它们有什么区别呢?
- 在变更数据捕获中,应用以可变方式使用数据库,可以任意更新和删除记录。变更日志是从数据库的底层提取的(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配。
- 在事件溯源中,应用逻辑显式构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加写入的,更新与删除是不鼓励的或禁止的。事件被设计为旨在反映应用层面发生的事情,而不是底层的状态变更。
事件溯源的核心是区分事件(event)和命令(command)。当来自用户的请求刚到达时,它一开始是一个命令:在这个时间点上它仍然可能可能失败,比如,因为违反了一些完整性条件。应用必须首先验证它是否可以执行该命令。如果验证成功并且命令被接受,则它变为一个持久化且不可变的事件。
在事件生成的时刻,它就成为 事实(fact)。即使用户对其进行删除或者修改,也只是在后续单独添加了删除、修改事件。
流处理
应用场景
除了传统的监控(欺诈检测、机器状态检测、金融交易等),流处理还有以下这些应用场景:
- 复合事件处理(CEP)
- 数据分析
- 物化视图
- 搜索引擎
- 消息传递和 PRC
时间推理
事件时间和处理时间
在流处理中有事件时间和处理时间的概念:
- 事件时间:事件发生的时间。
- 处理时间:机器接收到事件时,对其进行处理的时间。
如果我们以处理时间作为标准,则可能会出现逻辑上的紊乱(与时间实际发生的顺序不同),而如果是事件时间作为标准,则考虑到延迟、排队、网络故障等因素,则可能会出现某些事件延迟很久才会到来,甚至丢失的情况。
知道什么时候准备好了
当我们使用事件时间定义窗口时,就会遇到上面说的问题,我们无法判断当前这个窗口的事件有没有完整的到来。
通常情况下,我们会设定一个超时时间,当到达这个时间后我们就认为当前窗口已经就绪,开始聚合计算。倘若后续有延迟的数据到来,此时可以采用两种方案进行处理:
- 丢弃数据:丢弃这些延迟的数据。为了防止丢失的数据过多,可以设定一个监控的阈值,当丢弃数据过多时发出警报,重放数据。
- 修正数据:将该数据放入旧窗口中,同时需要回撤以前的输出,确保数据的正确性。
时钟
考虑到各个机器的物理时钟可能会存在误差(客户端的时间可能因为用户的错误设置,出现明显的错误),通常需要记录三个时间戳来校准时间:
- 事件发生的时间(依赖于客户端时钟)。
- 事件发送给服务器的时间(依赖于客户端时钟)
- 服务器接收事件的时间(依赖于服务器时钟)。
通过从第三个时间戳中减去第二个时间戳,可以估算设备时钟和服务器时钟之间的偏移(假设网络延迟与所需的时间戳精度相比可忽略不计)。然后可以将该偏移应用于事件时间戳,从而估计事件实际发生的真实时间(假设设备时钟偏移在事件发生时与送往服务器之间没有变化)。
窗口类型
常见的窗口类型有以下几种:
- 滚动窗口(Tumbling Window):滚动窗口有着固定的长度,每个事件都仅能属于一个窗口。例如,假设你有一个 1 分钟的滚动窗口,则所有时间戳在
10:03:00
和10:03:59
之间的事件会被分组到一个窗口中,10:04:00
和10:04:59
之间的事件被分组到下一个窗口,依此类推。 - 跳动窗口(Hopping Window):跳动窗口也有着固定的长度,但允许窗口重叠以提供一些平滑。例如,一个带有 1 分钟跳跃步长的 5 分钟窗口将包含
10:03:00
至10:07:59
之间的事件,而下一个窗口将覆盖10:04:00
至10:08:59
之间的事件,等等。通过首先计算 1 分钟的滚动窗口(tunmbling window),然后在几个相邻窗口上进行聚合,可以实现这种跳动窗口。 - 滑动窗口(Sliding Window):滑动窗口包含了彼此间距在特定时长内的所有事件。例如,一个 5 分钟的滑动窗口应当覆盖
10:03:39
和10:08:12
的事件,因为它们相距不超过 5 分钟。通过维护一个按时间排序的事件缓冲区,并不断从窗口中移除过期的旧事件,可以实现滑动窗口。 - 会话窗口(Session window):与其他窗口类型不同,会话窗口没有固定的持续时间,它将同一用户出现时间相近的所有事件分组在一起,而当用户一段时间没有活动时结束窗口。
流连接
在流处理中主要有以下三种连接类型:
-
流流连接(窗口连接):两个输入流都由活动事件组成,而连接算子在某个时间窗口内搜索相关的事件。如果你想要找出一个流内的相关事件,连接的两侧输入可能实际上都是同一个流(自连接)。
-
流表连接(流扩充):一个输入流由活动事件组成,另一个输入流是数据库变更日志。变更日志保证了数据库的本地副本是最新的。对于每个活动事件,连接算子将查询数据库,并输出一个扩展的活动事件。
-
表表连接(维护物化视图):两个输入流都是数据库变更日志。在这种情况下,一侧的每一个变化都与另一侧的最新状态相连接。结果是两表连接所得物化视图的变更流。
容错
微批量与存档点
考虑到流处理没有边界,数据永远不会停止,为了尽可能的减少延迟,Spark Streaming 将流分解为一个个小块,并像微型批处理一样处理每个块。这种方法被称为微批(microbatching)。
而 Flink 则采取了另一种方案,它会定期生成状态的滚动存档点并将其写入持久存储。如果流算子崩溃,它可以从最近的存档点重启,并丢弃从最近检查点到崩溃之间的所有输出。
在流处理中,这两种方法都满足于恰好一次语义(exactly-once semantics)。
原子提交
为了在出现故障时满足 exactly-once ,我们需要确保事件处理的所有输出和副作用当且仅当处理成功时才会生效。这些事情要么都原子地发生,要么都不发生,但是它们不应当失去同步。
幂等性
我们的目标是丢弃任何失败任务的部分输出,以便能安全地重试,而不会生效两次。分布式事务是实现这个目标的一种方式,而另一种方式是依 幂等性(idempotence)。
幂等操作指的是是多次重复执行与单次执行效果相同的操作。
那么如何实现幂等呢?可以参考 Kafka 的解决方案,即为每个消息附带一个持久的、单调递增的偏移量,通过这个偏移量就可以判断这个消息是否被执行过,从而避免重复执行。
失败后重建状态
任何需要状态的流处理以及任何用于连接的表和索引,都必须确保在失败之后能恢复其状态。通常会采用如下解决方案:
- 将状态保存在远程数据存储中,并进行复制。
- 在流处理器本地保存状态,并定期复制。当流处理器从故障中恢复时,新任务可以读取状态副本,恢复处理而不丢失数据。
- 不需要复制状态,直接从输入流中重建状态。
至于要选择哪个方案,需要根据业务场景、底层架构的性能来进行分析。