定义现象

绝大部分任务都很快完成,只有一个或者少数几个任务执行的很慢甚至最终执行失败,
这样的现象为数据倾斜现象。
任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大于平均时长(木桶原理)。

Hive

单表数据

  • 对于某些不需要计算的数据可以优先过滤

  • 当任务重存在 group by 的聚合操作时,开启参数设置

是否在 Map 端进行聚合,默认为 True
set hive.map.aggr = true;
在 Map 端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
有数据倾斜的时候进行负载均衡(默认是 false)
set hive.groupby.skewindata = true;

  • 增加 reduce 数据

每个 Reduce 处理的数据量默认是 256MB
set hive.exec.reducers.bytes.per.reducer = 256000000
每个任务最大的 reduce 数,默认为 1009
set hive.exec.reducers.max = 1009
计算 reducer 数的公式
N=min(参数 2,总输入数据量/参数 1)(参数 2 指的是上面的 1009,参数 1 值得是 256M)

设置每个 job 的 Reduce 个数
set mapreduce.job.reduces = 15;

多表join数据

  • 使用参数

join 的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
如果是 join 过程出现倾斜应该设置为 true
set hive.optimize.skewjoin=false;

如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认 100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果。通过hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper 数量,默认 10000。

set hive.skewjoin.mapjoin.map.tasks=10000;

  • MapJoin

设置自动选择 MapJoin
set hive.auto.convert.join=true; #默认为 true
大表小表的阈值设置(默认 25M 以下认为是小表):
set hive.mapjoin.smalltable.filesize=25000000;

MapJoin 是将 Join 双方比较小的表直接分发到各个 Map 进程的内存中,在 Map 进程中进行 Join 操作,这样就不用进行 Reduce 步骤,从而提高了速度。

Spark

数据倾斜一般是发生在 shuffle 类的算子,比如 distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup 等,涉及到数据重分区,如果其中某一个 key 数量特别大,就发生了数据倾斜。

单表数据

  • 可以在 shuffle 之前的 map 端做预聚合操作。即在 shuffle 前将同一分区内所属同 key 的记录先进行一个预结算,再将结果进行 shuffle,发送到 reduce 端做一个汇总。
    比如 : reduceByKey 替代 groupByKey、两阶段聚合(加盐局部聚合+去盐全局聚合)等。

多表Join 数据

  • 大小表的Join,使用 广播机制。小表足够小,可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中。
  • 拆分大 key 打散大表,扩容小表。
  1. 将数据倾斜的 key 和没有数据倾斜的 key 分别为两个数据集;
  2. 对数据倾斜的 key 在其加上随机前缀,然后与另一个表的相同 key 也做随机前缀后 join。
  3. 对正常的数据做join,对倾斜的数据做join,然后 union。
方案简介
HiveETL预处理Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行 join,然后再 Spark作业中针对的数据就不是原来的表,而是预处理后的表,在Spark时就不需要进行原先的shuffle操作了
过滤少数导致数据倾斜的key如果少数几个数据量特别多的 key 对作业的执行和计算结果不重要,那么直接结果掉它们
提高shuffle 并行度提高shuffle类算子并行度
两阶段聚合先随机加前缀预聚合,第二次去掉前缀再聚合
将reduce join 转成 map join小表广播到大表所在 executor进行 mapjoin
采样倾斜key并分拆join对少数key的数据加上随机前缀,另一个表也膨胀为加上随机前缀,然后进行 join,再与正常join数据进行 union
使用随机前缀和扩容RDD进行join将有大量数据倾斜的 key,每条都打上一个随机前缀,将另一个RDD彭场,然后两个RDD进行join

Flink

  • 使用 LocalKeyBy
    通过在 KeyBy 之前积攒一定数量的数据,然后进行聚合,减少下游的数据量,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈
  • shuffle 之前发送数据倾斜
    由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些partition 的数据量较少。需要让 Flink 任务强制进行 shuffle。使用 shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。
  • keyBy 后的窗口聚合操作存在数据倾斜
    因为使用了窗口,变成了有界数据(攒批)的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
    ➢ 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
    注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二
    阶段分组依据,避免不同窗口的结果聚合到一起)
    ➢ 第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合

总结

综上所述,对数据倾斜的问题,首先要判断该 key 是否会对结果产生影响,对其进行过滤或者打上随机 key。然后还可以通过随机前缀的两阶段处理 和 增加 reduce, map,减少 shuffle,重分区(Flink)等。

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐