7 RDD常用算子(2)

sw
filter()
deffilter(f:T=Boolean):RDD[T]

函数说明

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

objectRDD_Transform_filter{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子filtervalrdd=(List(1,2,3,4))valfilterRDD=(num=num%2!=0)(println)//Step3:关闭环境()}}

输出结果:

13

应用场景,日志过滤。

objectRDD_Transform_filter_LogFilter{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子groupByvalrdd=("datas/")vartimeRDD=(line={valdata=("")valtime=data(3)("17/May/2023")}).collect().foreach(println)//Step3:关闭环境()}}

日志内容:

输出结果:

220.181.108.112--17/May/2023:02:04:48+0800"GET/HTTP/1.1"2009603/May/2023:01:04:58+0800"GET/HTTP/1.1"2009603/May/2023:12:04:58+0800"GET/HTTP/1.1"2009603
sample()

函数签名

defsample(withReplacement:Boolean,fraction:Double,seed:Long=):RDD[T])

参数说明:

抽取数据不放回(伯努利算法)伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。

具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不

withReplacement:boolean,抽取的数据不放回,false;

fraction:Double,抽取的几率,范围在[0,1]之间,0:全不取;1:全取;

seed:Long,随机数种子。

抽取数据放回(泊松算法)

withReplacement:boolean,抽取的数据是否放回,true:放回;false:不放回;

fraction:Double,重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数,但实际的抽取次数可能远大于期望值;

seed:Long,随机数种子。

defsample(withReplacement:Boolean,fraction:Double,seed:Long=):RDD[T])
defsample(withReplacement:Boolean,fraction:Double,seed:Long=):RDD[T]={require(fraction=0,s"Fractionmustbenonnegative,butgot${fraction}")withScope{require(fraction=0.0,"Negativefractionvalue:"+fraction)if(withReplacement){newPartitionwiseSampledRDD[T,T](this,newPoissonSampler[T](fraction),true,seed)}else{newPartitionwiseSampledRDD[T,T](this,newBernoulliSampler[T](fraction),true,seed)}}}

函数说明

根据指定的规则从数据集中抽取数据。

objectRDD_Transform_sample{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子filtervalrdd=(List(1,2,3,4,5,6,7,8,9,10))println((withReplacement=false,0.4,1).collect().mkString(","))//Step3:关闭环境()}}

输出结果:

2,5,6,8

sample()算子源码如下,

当withReplacement为false时,会使用BernoulliSampler()贝努力(伯努利)算法选取样本,贝努力算法相当于抛硬币,将符合人头或字的数字作为样本返回。

当withReplacement为true时,会使用PoissonSampler()离散概率分布算法生成样本。

应用场景:在产生数据倾斜时使用。在shuffle的情况下,数据可能出现倾斜,通过sample()算子获取一定数量的数据样本观察其中的某些key值是否重复的比例过高,如果反复取sample对应的key都有重复值较多的情况,就说明该key可能引起数据倾斜,需要加以处理。

distinct()

函数签名

defdistinct()(implicitord:Ordering[T]=null):RDD[T]defdistinct(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]

函数说明

将数据集中数据去重。

objectRDD_Transform_distinct{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子distinctvalrdd=(List(1,2,3,4,1,2,3,4))valdistinctRDD=()(println)//Step3:关闭环境()}}

输出结果:

1234

Scala集合的distinct方法,其中通过HashSet的方法识别重复值。

defaultReprdistinct(){booleanisImmutable=;if((1)=0){();}else{Builderb=();HashSetseen=newHashSet();Iteratorit=();booleandifferent=false;}

RDD的去重方式,主要看partitioner部分,由于partitioner默认为null,因此主要逻辑体现在下述代码行:

case_=map(x=(x,null)).reduceByKey((x,_)=x,numPartitions).map(_._1)

以数组List(1,2,3,4,1,2,3,4)为例:

map(x=(x,null)):将数组变为:(1,null),(2,null),(3,null),(4,null),(1,null),(2,null),(3,null),(4,null)的tuple数组;

reduceByKey((x,_)=x,numPartitions):对tuple数组做聚合(1,null),(1,null),聚合后,key不变,(null,null),取第一个值,即null,所以聚合后的结果:(1,null),(2,null),(3,null),(4,null);

map(_._1):只保留tuple中的第一个元素,(1,null),(2,null),(3,null),(4,null)变回1,2,3,4。

distinct()算子的完整源码如下:

defdistinct(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{defremoveDuplicatesInPartition(partition:Iterator[T]):Iterator[T]={//=newExternalAppOnlyMap[T,Null,Null](createCombiner=_=null,mergeValue=(a,b)=a,mergeCombiners=(a,b)=a)((_-null))(_._1)}partitionermatch{caseSome(_)ifnumPartitions===mapPartitions(removeDuplicatesInPartition,preservesPartitioning=true)case_=map(x=(x,null)).reduceByKey((x,_)=x,numPartitions).map(_._1)}}
coalesce()

函数签名

defcoalesce(numPartitions:Int,shuffle:Boolean=false,partitionCoalescer:Option[PartitionCoalescer]=)(implicitord:Ordering[T]=null):RDD[T]

函数说明

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本。

objectRDD_Transform_coalesce{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子distinctvalrdd=(List(1,2,3,4),numSlices=4)valcoalesceRDD=(2)("output")//Step3:关闭环境()}}

生成的分区文件:

[1,2][3,4]

如果原始数据与分区数据调整成下述的方式,数组有6个元素,变化前3个分区,变化后2个分区。

objectRDD_Transform_coalesce{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子coalescevalrdd=(List(1,2,3,4,5,6),numSlices=3)valcoalesceRDD=(2)("output")//Step3:关闭环境()}}

默认情况下,coalesce()算子不打乱原始分区,如下图所示,coalesce()采用的是整个分区合并的方式。

可以通过指定shuffle参数的方式,显式让shuffle执行。

valcoalesceRDD=(2,shuffle=true)

指定shuffle参数后,分区随机进行打散:

[1,2][3,4][5,6]=[1,4,5][2,3,6]

如果需要扩展分区,可以使用coalesce()进行分区扩展,如果不指定shuffle=true,将无效,只有指定shuffle=true才能实现分区扩大。

repartition()

为让coalesce()功能更明确,只作为分区缩减,专门增加repartition()算子,进行分区扩展。

函数签名

defrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]

函数说明

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

查看repartion()源代码,发现底层就是调用coalesce()算子,并默认指定shuffle=ture。

defrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{coalesce(numPartitions,shuffle=true)}
sortBy()

根据指定规则对数据进行排序。

函数签名

defsortBy[K](f:(T)=K,ascing:Boolean=true,numPartitions:Int=)(implicitord:Ordering[K],ctag:ClassTag[K]):RDD[T]

函数说明

该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程。

objectRDD_Transform_sortBy{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子sortByvalrdd=(List(6,2,4,5,3,1),numSlices=2)valsortRDD=(num=num)().foreach(println)//Step3:关闭环境()}}

数据的顺序:[6,2,4][5,3,1]=[1,2,3][4,5,6]

从结果知:sortBy()算子要进行shuffle。

应用场景,对tuple进行排序。

objectRDD_Transform_sortBy_tuple{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子sortByvalrdd=(List(("1",1),("11",2),("2",3)),numSlices=2)valsortRDD=(t=t._1)().foreach(println)//Step3:关闭环境()}}

默认会按照字典序进行排序:

(1,1)(11,2)(2,3)

若不想按字典序排列,可以进行数据类型转换:

valsortRDD=(t=t._1.toInt)

输出结果:

(1,1)(2,3)(11,2)

sortBy()默认为升序,第二个方式可以改变排序方式:

valsortRDD=(t=t._1,ascing=false)

输出结果,按字典序降序:

(2,3)(11,2)(1,1)
文章版权声明:除非注明,否则均为机床资讯库原创文章,转载或复制请以超链接形式并注明出处。

上一个 二手机床市场淘了台安阳鑫盛6163数控车床回来大修,越修越生气

下一个 特靠谱、有干劲、有才华……这些词形容咱朝阳群众,很合适!