最近在使用spark处理分析一些公司的埋点数据,埋点数据是json格式,现在要解析json取特定字段的数据,做一些统计分析,所以有时候需要把数据从集群上拉到driver节点做处理,这里面经常出现的一个问题就是,拉取结果集过大,而驱动节点内存不足,经常导致OOM,也就是我们常见的异常:
java.lang.OutOfMemoryError: Java heap space
这种写法的代码一般如下:
//加载HDFS数据
val rdd=sc.textFile("/data/logs/*")
//在驱动程序获取结果集
val datas=ArrayBuffer[String]()
//把所有数据,拉倒驱动端操作
rdd.collect.foreach(line=>{
datas += line.split('#')(1) //得到某个字段
})
sc.stop()
上面的这种写法,基本原理就是一次性把所有分区的数据,全部读取到driver节点上,然后开始做处理,所以数据量大的时候,经常会出现内存溢出情况。
(问题一)如何避免这种情况?
分而治之,每次只拉取一个分区的数据到驱动节点上,处理完之后,再处理下一个分数据的数据。
(问题二)如果单个分区的数据已经大到内存装不下怎么办?
给数据集增加更多的分区,让大分区变成多个小分区。
(问题三)如果结果集数据大于内存的大小怎么办?
要么增加驱动节点的内存,要么给每个分区的数据都持久化本地文件上,不再内存中维护
下面来看下关键问题,如何修改spark的rdd分区数量?
我们知道在spark里面RDD是数据源的抽象模型,RDD里面实际上是把一份大数据源切分成了多个分区数据,然后来并行处理这份大数据集。
默认情况下如果Spark从HDFS上加载数据,默认分区个数是按照HDFS的block size来切分的,当然我们在加载的时候可以指定的分区个数。
textFile(path,partitionNums)//第二个参数可以指定分区个数
如果在加载时不指定分区个数,spark里面还提供了两个函数来进行重分区:
(1)def coalesce(numPartitions: Int, shuffle: Boolean = false):RDD[T]
(2)def repartition(numPartitions: Int):RDD[T]
接着我们来看下coalesce函数和repartition函数的区别:
通过查看源码得知repartition函数内部实际上是调用了coalesce函数第二个参数等于true时的封装。所以我们重点来关注下coalesce函数即可:
coalesce的第一个参数是修改后的分区个数
coalesce的第二个参数是控制是否需要shuffle
举一个例子:
当前我们RDD的分区个数是100:
(1)如果要变成10,应该使用
rdd.coalesce(10,false)
(2)如果要变成300,应该使用
rdd.coalesce(300,true)
(3)如果要变成1,应该使用
rdd.coalesce(1,true)
这里解释一下:
分区数从多变少,一般是不需要开启shuffle的,这样性能最高,因为不需要跨网络混洗数据,当然你也可以开启shuffle在特定场景下,如分区数据极其不均衡。但建议一般不要使用。
分区数从少变多,必须开启shuffle,如果不开启那么分区数据是不会改变的,由少变多必须得重新混洗数据才能变多,这里需要注意一点,如果数据量特别少,那么会有一些分区的数据是空。
最后的例子是一种极端场景,如果从多变成1,不开启shuffle,那么可能就个别节点计算压力特别大,集群资源不能充分利用,所以有必要开启shuffle,加速合并计算的流程。
明白了如何改变rdd的分区个数之后,我们就可以文章开头遇到的问题结合起来,拉取大量数据到驱动节点上,如果整体数据集太大,我们就可以增加分区个数,循环拉取,但这里面需要根据具体的场景来设置分区个数,因为分区个数越多,在spark里面生成的task数目就越多,task数目太多也会影响实际的拉取效率,在本案例中,从hdfs上读取的数据默认是144个分区,大约1G多点数据,没有修改分区个数的情况下处理时间大约10分钟,在调整分区个数为10的情况下,拉取时间大约在1-2分钟之间,所以要根据实际情况进行调整。
文章开始前的代码优化后的如下:
def pt_convert( idx:Int,ds:Iterator[String] ,seq:Int):Iterator[String]={
if(seq==idx) ds else Iterator()
}
------------------------------
//加载HDFS数据
val rdd=sc.textFile("/data/logs/*")
//在驱动程序获取结果集
val datas=ArrayBuffer[String]()
//重分区并合理优化分区个数
val new_rdd=rdd.coalesce(10)
//得到所有的分区信息
val parts= new_rdd.partitions
//循环处理每个分区的数据,避免导致OOM
for(p<-parts){
//获取分区号
val idx=p.index
//第二个参数为true,避免重新shuffle,保留原始分区数据
val parRdd=new_rdd.mapPartitionsWithIndex[String](pt_convert(_,_,idx),true)
//读取结果数据
val data=parRdd.collect()
//循环处理数据
for(line<-data){
datas += line.split('#')(1) //得到某个字段
}
}
最后在看下,spark任务的提交命令:
spark-submit --class SparkHdfsDataAnalysis
--conf spark.driver.maxResultSize=1g
--master yarn
--executor-cores 5
--driver-memory 2g
--executor-memory 3g
--num-executors 10
--jars $jars spark-analysis.jar $1 $2
这里面主要关注参数:
spark.driver.maxResultSize=1g
driver-memory 2g
单次拉取数据结果集的最大字节数,以及驱动节点的内存,如果在进行大结果集下拉时,需要特别注意下这两个参数的设置。
参考文档:
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD