批处理
从高层次来看,存储和处理数据的系统可以分为两大类:
- 记录系统:其持有数据的权威版本。当新的数据进入时首先会记录在这里。每个记录在系统中只表示一次。如果其他系统和记录系统之间存在任何差异,那么此时将以记录系统中的值为准。
- 派生数据系统:派生数据系统中的数据通常是另一个系统中的现有数据以某种方式进行转换或处理的结果。如果丢失派生数据,可以从原始数据源中重新创建。典型的例子是缓存(cache):如果数据在缓存中,则可以从缓存中读取;如果缓存不包含所需数据,则降级由底层数据库提供。非规范化的值,索引和物化视图亦属此类。在推荐系统中,预测汇总数据通常衍生自用户日志。
派生数据是冗余的(redundant),因为它重复了已有的信息。但是派生数据对于获得良好的只读查询性能通常是至关重要的。它通常是非规范化的。可以从单个源头衍生出多个不同的数据集,使我们能从不同的视角观察数据。
根据响应时间的不同,数据处理系统通常分为以下三种类型:
- 在线服务(在线系统):服务等待客户的请求或指令到达。每收到一个,服务会试图尽快处理它,并发回一个响应。
- 批处理系统(离线系统):一个批处理系统有大量的输入数据,通过运行一个 job 来处理这些数据,并生成一些输出数据,这往往需要一段时间(从几分钟到几天),所以通常不会有用户等待 job 完成。相反,批量作业通常会定期运行。
- 流处理系统(准实时系统):处于在线和离线系统之间。像批处理系统一样,流处理消费输入并产生输出(并不需要响应请求)。但是,流式作业在事件发生后不久就会对事件进行操作,而批处理作业则需等待固定的一组输入数据。这种差异使流处理系统比起批处理系统具有更低的延迟。
基于 UNIX 工具的批处理
最简便的批处理方案是基于 UNIX 工具实现的,例如我们需要在一个网站中找出访问最高频的五个网页:
|
|
cat
读取日志文件。awk
每行只获取第七个字段,恰好是 URL 的那个。sort
按照字段序排列 URL。uniq -c
计算出每一个 URL 重复的次数。sort -r -n
按照每一个 URL 出现的次数降序排序。head -n 5
取出现次数最高的五个 URL。
使用 awk
、sed
、grep
、sort
、uniq
和 xargs
的组合,可以在几分钟内完成许多数据分析,并且它们的性能也相当不错。
然而这些 Unix 工具有一个致命的缺陷,它们只能在一台机器上运行。为了解决这个问题,MapReduce 诞生了。
MapReduce
MapReduce 作业执行
MapReduce 是一个编程框架,你可以使用它编写代码来处理 HDFS 等分布式文件系统中的大型数据集。其数据处理流程如下:
- 读取一组输入文件,并将其分解成记录(records)。
- 调用
Mapper
函数,从每条输入记录中提取一对键值。 - 按键排序所有的键值对。
- 调用
Reducer
函数遍历排序后的键值对。
这四个步骤可以作为一个 MapReduce 作业执行。由于步骤 1(将文件分解成记录)由输入格式解析器处理,而步骤 3 中的排序步骤隐含在 MapReduce,因此我们去需要自己去实现 Mapper
和 Reducer
函数即可:
- Mapper:Mapper 会在每条输入记录上调用一次,其工作是从输入记录中提取键值。对于每个输入,它可以生成任意数量的键值对。它不会保留从一个输入记录到下一个记录的任何状态,因此每个记录都是独立处理的。
- Reducer:MapReduce 框架拉取由 Mapper 生成的键值对,收集属于同一个键的所有值,并在这组值上迭代调用 Reducer。 Reducer 可以产生输出记录。
分布式执行 MapReduce
MapReduce 与 Unix 命令管道的主要区别在于:MapReduce 可以在多台机器上并行执行计算,而无需编写代码来显式处理并行问题。
下图即为 Hadoop MapReduce 作业中的数据流,其并行化基于分区实现:作业的输入通常是 HDFS 中的一个目录,输入目录中的每个文件或文件块都被认为是一个单独的分区,可以单独处理 map
任务。
只要有足够的空闲内存和 CPU 资源,MapReduce 调度器就会尝试在其中一台存储输入文件副本的机器上运行 Mapper 任务。这个原则被称为将计算放在数据附近:它节省了通过网络复制输入文件的开销,减少网络负载并增加局部性。
如上图,MapReduce 的完整执行流程如下:
- MapReduce 框架首先将代码复制到适当的机器。然后启动
Map
任务并开始读取输入文件,一次将一条记录传入Mapper
回调函数。 - 键值对必须进行排序,但数据集可能太大,无法在单台机器上使用常规排序算法进行排序。因此每个
Map
任务都按照Reducer
对输出进行分区。每个分区都被写入Mapper
程序的本地磁盘 - 当
Mapper
读取完输入文件,并写完排序后的输出文件后,MapReduce 调度器就会通知Reducer
可以从该Mapper
中获取输出文件。 Reducer
连接到每个Mapper
,并下载自己相应分区的有序键值对文件。按Reducer
分区,排序,从Mapper
向Reducer
复制分区数据,这一整个过程被称为shuffle
。Reduce
任务从Mapper
获取文件,并将它们merge
在一起,并保留有序特性。因此,如果不同的Mapper
生成了键相同的记录,则在Reducer
的输入中,这些记录将会相邻。Reducer
调用时会收到一个键,和一个迭代器作为参数,迭代器会顺序地扫过所有具有该键的记录。Reducer
可以使用任意逻辑来处理这些记录,并且可以生成任意数量的输出记录。这些输出记录会写入分布式文件系统上的文件中。
考虑到单个 MapReduce 作业可以解决的问题范围很有限,因此我们可以将 MapReduce 作业链接成一个工作流,即一个作业的输出成为下一个作业的输入。
只有当作业成功完成后,批处理作业的输出才会被视为有效的。因此,工作流中的一项作业只有在先前的作业成功完成后才能开始。
Reduce 侧 join 与分组
在许多数据集中,一条记录与另一条记录存在关联是很常见的,例如关系模型中的外键、文档模型中的文档引用、图模型中的边。当你需要同时访问这一关联的两侧时,就必须进行 join。
排序合并 join
在 Reducer 中执行实际的 join 逻辑,被称为 Reduce 侧 join 。
通常我们会采用排序合并 join,其原理如下:每个参与 join 的输入都会由一个提取 join 键的 Mapper 进行处理。通过分区、排序和合并,具有相同键的所有记录最终都会进入相同的 Reducer 调用,然后这个函数输出 join 好的记录。
分组
除了 join 之外,还有另一种方法能将相关数据放在一起,即按某个键对记录分组(如 SQL 中的 GROUP BY
)。
使用 MapReduce 实现这种分组操作的最简单方法是设置 Mapper,使它们生成的键值对使用所需的分组键。然后在分区和排序过程将所有具有相同分区键的记录导向同一个 Reducer。(因此在 MapReduce 之上实现分组和 join 看上去非常相似。)
数据倾斜
如果存在与单个键关联的大量数据,则将具有相同键的所有记录放到相同的位置这种模式就会产生问题:大量的数据放到一台机器上,从而导致负载不均衡。这种情况也被称为数据倾斜,而这种数据被称为热点数据(热键)。
为了处理这种情况,当 join 的输入存在热键的时候,可以采取一些补偿机制,例如下面几种方法:
- Pig 的解决方案:首先运行一个抽样作业来确定哪些键是热键。join 实际执行时,Mapper 会将热键的关联记录随机发送到几个 Reducer 之一。对于另外一侧的 join 输入,与热键相关的记录需要被复制所有处理该键的 Reducer 上。
- Hive 的解决方案:在表格元数据中显式指定热键,并将与这些键相关的记录单独存放,与其它文件分开。当在该表上执行连接时,对于热键,它会使用 Map 端 join。
Map 侧 join
Reduce 侧 join 的优点是不需要对输入数据做任何假设:无论其属性和结构如何,Mapper 都可以对其预处理以备连接。然而不利的一面是,排序,复制至 Reducer,以及合并 Reducer 输入,所有这些操作可能开销巨大。当数据通过 MapReduce 阶段时,数据可能需要落盘好几次(次数取决于可用的内存缓冲区)。
倘若我们能够对输入数据做一些假设,我们就可以使用 Map 侧 join 来加快我们的 join 速度。
广播哈希 join
适用于执行 Map 端连接的最简单场景是大数据集与小数据集连接的情况。
其要求小数据集需要足够小,不需要进行分区,以便可以将其完全加载进一个哈希表中。因此,你可以为连接输入大端的每个分区启动一个 Mapper,将输入小端的哈希表加载到每个 Mapper 中,然后扫描大端,一次一条记录,并为每条记录查询哈希表。
除了将较小的连接输入加载到内存哈希表中,另一种方法是将较小输入存储在本地磁盘上的只读索引中。索引中经常使用的部分将保留在操作系统的页面缓存中,因而这种方法可以提供与内存哈希表几乎一样快的随机查找性能。
分区哈希 join
如果两个连接输入以相同的方式分区(使用相同的键,相同的哈希函数和相同数量的分区),则可以独立地对每个分区应用哈希表方法。
Map 侧合并 join
如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行排序,则可适用另一种 Map 侧连接的变体。
在这种情况下,输入是否小到能放入内存并不重要,因为这时候 Mapper 同样可以执行归并操作(通常由 Reducer 执行):按键递增的顺序依次读取两个输入文件,将具有相同键的记录配对。
Mapreduce 工作流与 Map 侧 join
当下游作业使用 MapReduce join 的输出时,选择 Map 侧 join 或 Reduce 侧 join 会影响输出的结构。Reduce 侧 join 的输出是按照 join 键进行分区和排序的,而 Map 端 join 的输出则按照与较大输入相同的方式进行分区和排序。
批处理的应用场景
批处理有以下几种常见的使用场景:
- 构建搜索引擎。
- 构建机器学习系统,例如分类器(比如垃圾邮件过滤器,异常检测,图像识别)与推荐系统(例如,你可能认识的人,你可能感兴趣的产品或相关的搜索)。
- ……
MapReduce 之后
物化中间状态
在很多情况下,一个作业的输出只能用作另一个作业的输入。在这种情况下,分布式文件系统上的文件只是简单的中间状态(intermediate state):一种将数据从一个作业传递到下一个作业的方式。将这个中间状态写入文件的过程称为物化(materialization)。
在之前的例子中,Unix 利用管道将一个命令的输出与另一个命令的输入连接起来。管道并没有完全物化中间状态,而是只使用一个小的内存缓冲区,将输出增量地流(stream) 向输入。与 Unix 管道相比,MapReduce 完全物化中间状态的方法存在以下不足之处:
- MapReduce 作业只有在前驱作业中的所有任务都完成时才能启动,而由 Unix 管道连接的进程会同时启动,输出一旦生成就会被消费。不同机器上的数据偏斜或负载不均意味着一个作业往往会有一些掉队的任务,比其他任务要慢得多才能完成,拖慢了整个工作流程的执行。
- Mapper 通常是多余的:它们仅仅是读取刚刚由 Reducer 写入的同样文件,为下一个阶段的分区和排序做准备。在许多情况下,Mapper 代码可能是前驱 Reducer 的一部分:如果 Reducer 和 Mapper 的输出有着相同的分区与排序方式,那么 Reducer 就可以直接串在一起,而不用与 Mapper 相互交织。
- 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,对这些临时数据来说显得有些浪费。
数据流引擎
为了解决 MapReduce 的这些问题,几种用于分布式批处理的新执行引擎被开发出来(如 Spark、Flink 等)。它们的设计方式有一个共同点:把整个工作流作为单个作业来处理,而不是把它分解为独立的子作业。
由于它们将工作流显式建模为数据从几个处理阶段穿过,所以这些系统被称为数据流引擎(dataflow engines)。像 MapReduce 一样,它们在一条线上通过反复调用用户定义的函数来一次处理一条记录,它们通过输入分区来并行化载荷,它们通过网络将一个函数的输出复制到另一个函数的输入。
与 MapReduce 不同,这些函数不需要严格扮演交织的 Map 与 Reduce 的角色,而是可以以更灵活的方式进行组合。我们称这些函数为算子(operators),数据流引擎提供了几种不同的选项来将一个算子的输出连接到另一个算子的输入:
- 对记录按键重新分区并排序,就像在 MapReduce 的 shuffle 阶段一样。
- 接受多个输入,并以相同的方式进行分区,但跳过排序。
- 将一个算子的输出,发送到连接算子的所有分区。
与 MapReduce 模型相比,它有几个优点:
- 排序等昂贵的工作只需要在实际需要的地方执行,而不是默认地在每个 Map 和 Reduce 阶段之间出现。
- 没有不必要的 Map 任务,因为 Mapper 所做的工作通常可以合并到前面的 Reduce 算子中。
- 由于工作流中的所有连接和数据依赖都是显式声明的,因此调度程序能够总览全局,知道哪里需要哪些数据,因而能够利用局部性进行优化。
- 算子间的中间状态足以保存在内存中或写入本地磁盘,这比写入 HDFS 需要更少的 I/O(必须将其复制到多台机器,并将每个副本写入磁盘)。
- 算子可以在输入就绪后立即开始执行;后续阶段无需等待前驱阶段整个完成后再开始。
- 与 MapReduce(为每个任务启动一个新的 JVM)相比,现有JVM 进程可以重用来运行新算子,从而减少启动开销。
容错
完全物化中间状态至 HDFS 的一个优点是,它具有持久性,这使得 MapReduce 中的容错相当容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统重新读取相同的输入。
Spark、Flink 和 Tez 避免将中间状态写入 HDFS,因此它们采取了不同的方法来容错:如果一台机器发生故障,并且该机器上的中间状态丢失,则它会从其他仍然可用的数据重新计算(在可行的情况下是先前的中间状态,要么就只能是原始输入数据,通常在 HDFS 上)。
为了实现重新计算,框架需要获取一个数据的计算信息(输入分区、算子等)。 Spark 使用**弹性分布式数据集(RDD,Resilient Distributed Dataset)**的抽象来跟踪数据的谱系,而 Flink 对算子状态存档,允许恢复运行在执行过程中遇到错误的算子。
图与迭代处理
像 Spark、Flink 和 Tez 这样的数据流引擎通常将算子作为**有向无环图(DAG)**的一部分安排在作业中。这与图处理不一样:在数据流引擎中,从一个算子到另一个算子的数据流被构造成一个图,而数据本身通常由关系型元组构成。在图处理中,数据本身具有图的形式。
许多图算法是通过一次遍历一条边来表示的,将一个顶点与近邻的顶点连接起来,以传播一些信息,并不断重复,直到满足一些条件为止(例如,直到没有更多的边要跟进,或直到一些指标收敛)。
可以在分布式文件系统中存储图(包含顶点和边的列表的文件),但是这种重复至完成的想法不能用普通的 MapReduce 来表示,因为它只扫过一趟数据。这种算法因此经常以迭代的风格实现:
- 外部调度程序运行批处理来计算算法的一个步骤。
- 当批处理过程完成时,调度器检查它是否完成(基于完成条件 —— 例如,没有更多的边要跟进,或者与上次迭代相比的变化低于某个阈值)。
- 如果尚未完成,则调度程序返回到步骤 1 并运行另一轮批处理。