Map 和 Reduce 操作是如何完成的
简介
这篇文章是描述Hadoop中的 Map和Reduce操作是如何完成的。如果你不熟悉的,建议先了解Google的MapReduce编程模型。
Map
出于Map操作是并行性,输入的文件集将被分割成多块,这里叫 Filesplit 。如果某个文件过大而影响查找速度,那么它也会被分割成多块。分割操作是不会关心文件的内部逻辑结构的,例如面向文本的的分割是以任意字节数作为分割边界。对应每一个文件分割块,生成一个map任务。
当一个独立的map任务启动,它将对每个已配置的reduce任务打开一个相应的 output writer 。然后,从指定的 InputFormat 中获得 RecordReader 来读取它所有的 FileSplit 。InputFormat 分析输入,生成键值对(key-value pairs)。有些记录在FileSplit的边界处分割,InputFormat 必须能处理这些特殊的记录。例如,使用 TextInputFormat 读取到 FileSplit 的最后一行时会越过 FileSplit边界,同时,如果不是第一个FileSplit ,TextInputFormat 会忽略到新的第一行的内容。
没有必要一定让 InputFormat 同时生成有意义的 keys 和 valus 。例如 TextInputFormat 缺省的输出是由 文本行 为值 ,行开始的文件偏移量为键-大多数的应用仅使用文本行,而忽略掉偏移值。
键值对从 Recordreader 中读取,然后被传递到 已配置好的 Mapper。用户使用键值对在自己提供的Mapper中做要做的事情,并调用 OutputCollector.collect。生成的输出必须使用一个key 类和一个 value 类。这是因为 Map 输出 将会被写到一个 SequenceFile(所有的记录必须是同一类型,如果你要保存不同的数据结构,可以用类继承)中。Map的输入和输出的键值对可以是没有具体的依据或者相关性的。
Mapper 输出同时也被分区,这表示它被写入到 Partitioner 指定的 output 。缺省的 HashPartitioner 对 key 的类 使用 hashcode 方法。这样 reduce 任务能达到一个平均的负载。
N 输入文件将生成 M map任务来运行,每个 map 任务生出的 output的数量等于系统已经配置好的 reduce 任务的数量。每一个输出文件将被面向一个指定好的reduce任务,并且map输出的同一键的键值对将被分配到同一个reduce 任务。
Combine
当map操作输出时,它的键值对保存在内存空间里。出于效率的理由,有时需要提供一个 combiner 类 来执行一个 reduce-type 方法。 如果 combiner 被用上了,map 输出的 key-value pairs 不是马上就被写到 output 。而是改为先收集到到列表中, 每一个键 对应 一个列表。 当一定数量的 key-value pairs 被写入后,这个缓存区被清空,其中每一个键对应的所有值都被传送到 combiner 的 reduce 方法,然后 combiner 再输出 key-valus pairs (如 原始的 map 操作输出一样)。
Reduce
当一个 reduce 任务启动, 它的输入 是分散在map任务运行的各个节点上的多个文件中。如果在分布模式下运行,这需要先拷贝到本地文件系统(copy 过程)。
一旦本地的所有数据有效,(append 过程) 将执行,所有的数据被追加到一个文件中。 这个文件是排序后合并的(sort 过程),保证同一个键的所有记录是连续的。这使得 reduce 操作非常简单:文件被连续的读取,而同一个键的所有值将以迭代器的形式传递到 reduce 方法,直到碰到下一个key。
最后,每一个执行的 reduce 任务将生成一个 output 文件。文件的格式 由 JobConf.setOutputFormat 这个方法指定。 如果使用 SequentialOutputFormat ,key类和value 类也必须指定。
ps: 产品在进行第二轮内测和意见收集,我把思维搞的发散一点,想点其他的事情。
