侧边栏壁纸
  • 累计撰写 6 篇文章
  • 累计创建 6 个标签
  • 累计收到 0 条评论
标签搜索

sparksql小文件合并

jackknife007
2022-04-28 / 0 评论 / 0 点赞 / 227 阅读 / 1,792 字
温馨提示:
本文最后更新于 2022-04-28,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

0. 前言

过多小文件的存在会影响计算性能,同时会增加HDFS的元数据管理的开销,严重时会达到瓶颈导致服务不可用。所以需要在文件写入时进行小文件合并,或者配置定期合并小文件的任务。

下面分析下在处理小文件问题时参考的方法和踩过的坑。

1. 小文件产生原因

  1. 上游数据的源文件很小,或者分布不均匀。比如上游是实时任务写入的数据,或者在不同时间的数据量差异较大。

  2. 当前sql存在where过滤条件,过滤后当前的分区数据量很小,导致落盘后生成小文件。

  3. shuffle阶段产生数据倾斜或分组过多,也会导致小文件的产生。

2. 查询小文件情况

可以使用hdfs命令

查询某个分区文件数量情况:

hdfs dfs -count -h hdfs_path

查询某个分区文件详情:

hdfs dfs -du -h hdfs_path

3. 处理小文件的方式

sparksql提供了hint,可以在sql中写入hint,强制改变生成文件的数量

3.1 使用coalesce(n)

insert overwrite table  xxx partition(date='xxx')
select /*+ COALESCE(10) */   col1, col2  from table 

则最多会产生10个文件。

使用coalesce(n) hint后,数据不会产生shuffle。会起n个任务,将全部分区的数据分配给这n个任务处理,处理完后将每个任务的数据落盘。

这就会产生一个问题,如果sql中有过滤条件,单个task过滤后的数据量很小,还是会有小文件的问题。

coalesce可以说是合并文件,但不一定是将小文件合并成一个大文件,也可能多个很小的文件合并后还是小文件。

3.2 使用repartition(n)

insert overwrite table  xxx partition(date='xxx')
select /*+ REPARTITION(10) */   col1, col2  from table 

repartition(n)本质上就是shuffle为true时的coalesce。

但是在测试repartition时遇到一个问题。源数据大小是40G,但是经过repartition后的所有分区数据大小总和居然达到了600G!。虽然文件大小分布均匀,文件数量得到了控制,但是空间存储翻了10倍!这样不稳定在生产上也是无法使用的。

查询相关资料后,果然是压缩的问题导致的。因为文件一般都是使用orc或parquet格式存储,源分区其实是通过HashPartition的方式分布的,这样的数据分布可以让编码压缩得更加极致。

但是而repartition完全打乱后导致本来在一个文件的相同记录分布到n个文件,那就是每个文件都有该记录的编码索引,那么最终文件就变大了。

也可以参考这篇stackverflow这篇文章

3.3 使用AQE配合REPARTITION_BY_RANGE

先使用REPARTITION_BY_RANGE强制根据某个分散度较高的字段进行重分区,产生shuffle

insert overwrite table  xxx partition(date='xxx')
select /*+ REPARTITION_BY_RANGE(date, col1) */   col1, col2  from table 

并在参数中配置

--conf spark.sql.adaptive.enabled=true  \
--conf spark.sql.adaptive.forceApply=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=256MB \  
--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=100 \

开启aqe后,会在运行时动态优化执行计划,在shuffle阶段后,会自动处理数据倾斜、合并分区等。

各项参数含义可以参考这里

3.4 采用方案

由于coalesce的文件大小差距过大和repartition的文件存储空间翻倍的问题,最终采用了AQE配合REPARTITION_BY_RANGE的方案来解决小文件问题。

4. 在hive中处理小文件

可以参考这篇文章,还是离不开配置hive参数和使用distribute by

0

评论区