不久前,我正在运行一个SparkETL,该ETL从AWSS3提取数据进行了一些转换和清理,并将转换后的数据以Parquet格式写回到AWSS3。JSONGzip格式的数据量约为350GB。我的工作节点总共有48个核心,内存为280GB。
运行该作业后,我注意到该作业持续进行了超过24小时,并且之间有多次失败。该错误消息曾经是—执行器在120000ms之后超时。这项工作正在处理大量数据,并且数据写入阶段停留在某个地方。
看到工作日志后,我看到工作人员被卡在乱堆的数据中,大量数据被溢出到磁盘上。
2018–06–2712:21:42,671INFO[UnsafeExternalSorter]—(0timesofar)
寻找根本原因我取消了工作,但在决定后决定检查正在生成的查询计划。
()
这是为这项工作制定的计划。
那里!!我终于看到光了。
问题似乎是正在发生的联接Join。我的原始数据与从Redshift中拉出的一个小表连接在一起,看起来好像都是在对数据进行改组(Exchange哈希分区),最后发生了SortMergeJoin,这非常昂贵,因为排序后的数据在内存中,从而很快填充了内存。磁盘溢出。因此前进的道路很明确。我必须消除导致磁盘溢出的混洗。
解决方案在阅读了有关联接如何工作以及Catalyst优化器如何优化查询的一些知识之后,我意识到redshift表非常小。我决定在加入时使用广播提示。这是使用广播提示后的查询计划-
看哪哈希交换完成后,用一个BroadcastExchange替换,如果表的大小很小,则该替换并不重要。最终,该连接以BroadcastHashJoin的形式进行。这比SortMergeJoin轻量得多。
最终,在将内核数增加到200之后,我运行了该作业,并看到该作业在2.3小时内完成,磁盘溢出最少,而以前则需要24个小时以上。当考虑到增加的资源时,将近300%的改善。
尤里卡!甜美成功的气味令人陶醉。:D
(本文翻译自ShitijGoyal的文章《ImprovingSparkjobperformancewhilewritingParquetby300%》,参考:)