2 Reduce side tuning参数

2.1 ReduceTask运行内部原理

reduce的运行是分成三个阶段的。分别为copy->sort->reduce。由于job的每一个map都会根据 reduce(n)数将数据分成map 输出结果分成n个partition,所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以,为了优化reduce的执行时 间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数 据。这个过程就是通常所说的shuffle,也就是copy过程。

Reduce task在做shuffle时,实际上就是从不同的已经完成的map上去下载属于自己这个reduce的部分数据,由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,这个并行度是可以调整的,调整参数为:mapred.reduce.parallel.copies(default 5)。默认情况下,每个只会有5个并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,那么reduce也最多只能 同时下载5个map的数据,所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。

reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,或者中间结果的文件丢失,或者网络瞬断 等等情况,这样reduce的下载就有可能失败,所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这 次下载,并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。所以reduce下载线程的这个最大的下载时间段是可以调整的,调整参数为:mapred.reduce.copy.backoff(default 300秒)。如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。不过在网络环境比较好的情况下,没有必要调整。通常来说专业的集群网络不应该有太大问题,所以这个参数需要调整的情况不多。

Reduce将map结果下载到本地时,同样也是需要进行merge的,所以io.sort.factor的配置选项同样会影响reduce进行 merge时的行为,该参数的详细介绍上文已经提到,当发现reduce在shuffle阶段iowait非常的高的时候,就有可能通过调大这个参数来加 大一次merge时的并发吞吐,优化reduce效率。

Reduce在shuffle阶段对下载来的map数据,并不是立刻就写入磁盘的,而是会先缓存在内存中,然后当使用内存达到一定量的时候才刷入磁盘。这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数来设置:mapred.job.shuffle.input.buffer.percent(default 0.7),这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。也就是说,如果该reduce task的最大heap使用量(通常通过mapred.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。 默认情况下,reduce会使用其heapsize的70%来在内存中缓存数据。如果reduce的heap由于业务原因调整的比较大,相应的缓存大小也 会变大,这也是为什么reduce用来做缓存的参数是一个百分比,而不是一个固定的值了。

假设mapred.job.shuffle.input.buffer.percent为0.7,reduce task的max heapsize为1G,那么用来做下载数据缓存的内存就为大概700MB左右,这700M的内存,跟map端一样,也不是要等到全部写满才会往磁盘刷 的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷。这个限度阈值也是可以通过job参数来设定的,设定参数为:mapred.job.shuffle.merge.percent(default 0.66)。如果下载速度很快,很容易就把内存缓存撑大,那么调整一下这个参数有可能会对reduce的性能有所帮助。

当reduce将所有的map上对应自己partition的数据下载完成后,就会开始真正的reduce计算阶段(中间有个sort阶段通常时间 非常短,几秒钟就完成了,因为整个下载阶段就已经是边下载边sort,然后边merge的)。当reduce task真正进入reduce函数的计算阶段的时候,有一个参数也是可以调整reduce的计算行为。也就是:mapred.job.reduce.input.buffer.percent(default 0.0)。由于reduce计算时肯定也是需要消耗内存的,而在读取reduce需要的数据时,同样是需要内存作为buffer,这个参数是控制,需要多 少的内存百分比来作为reduce读已经sort好的数据的buffer百分比。默认情况下为0,也就是说,默认情况下,reduce是全部从磁盘开始读 处理数据。如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,当reduce计算逻辑消耗内存很小时,可以分一部分内存用来 缓存数据,反正reduce的内存闲着也是闲着。

2.2 Reduce side相关参数调优

选项类型默认值描述
mapred.reduce.parallel.copiesint5每个reduce并行下载map结果的最大线程数
mapred.reduce.copy.backoffint300reduce下载线程最大等待时间(in sec)
io.sort.factorint10同上
mapred.job.shuffle.input.buffer.percentfloat0.7用来缓存shuffle数据的reduce task heap百分比
mapred.job.shuffle.merge.percentfloat0.66缓存的内存中多少百分比后开始做merge操作
mapred.job.reduce.input.buffer.percentfloat0.0sort完成后reduce计算阶段用来缓存数据的百分比