deffilter(f:T=Boolean):RDD[T]
函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
objectRDD_Transform_filter{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子filtervalrdd=(List(1,2,3,4))valfilterRDD=(num=num%2!=0)(println)//Step3:关闭环境()}}输出结果:
13
应用场景,日志过滤。
objectRDD_Transform_filter_LogFilter{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子groupByvalrdd=("datas/")vartimeRDD=(line={valdata=("")valtime=data(3)("17/May/2023")}).collect().foreach(println)//Step3:关闭环境()}}日志内容:
输出结果:
220.181.108.112--17/May/2023:02:04:48+0800"GET/HTTP/1.1"2009603/May/2023:01:04:58+0800"GET/HTTP/1.1"2009603/May/2023:12:04:58+0800"GET/HTTP/1.1"2009603sample()
函数签名
defsample(withReplacement:Boolean,fraction:Double,seed:Long=):RDD[T])
参数说明:
抽取数据不放回(伯努利算法)伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
要
withReplacement:boolean,抽取的数据不放回,false;
fraction:Double,抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
seed:Long,随机数种子。
抽取数据放回(泊松算法)
withReplacement:boolean,抽取的数据是否放回,true:放回;false:不放回;
fraction:Double,重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数,但实际的抽取次数可能远大于期望值;
seed:Long,随机数种子。
defsample(withReplacement:Boolean,fraction:Double,seed:Long=):RDD[T])
defsample(withReplacement:Boolean,fraction:Double,seed:Long=):RDD[T]={require(fraction=0,s"Fractionmustbenonnegative,butgot${fraction}")withScope{require(fraction=0.0,"Negativefractionvalue:"+fraction)if(withReplacement){newPartitionwiseSampledRDD[T,T](this,newPoissonSampler[T](fraction),true,seed)}else{newPartitionwiseSampledRDD[T,T](this,newBernoulliSampler[T](fraction),true,seed)}}}函数说明
根据指定的规则从数据集中抽取数据。
objectRDD_Transform_sample{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子filtervalrdd=(List(1,2,3,4,5,6,7,8,9,10))println((withReplacement=false,0.4,1).collect().mkString(","))//Step3:关闭环境()}}输出结果:
2,5,6,8
sample()算子源码如下,
当withReplacement为false时,会使用BernoulliSampler()贝努力(伯努利)算法选取样本,贝努力算法相当于抛硬币,将符合人头或字的数字作为样本返回。
当withReplacement为true时,会使用PoissonSampler()离散概率分布算法生成样本。
应用场景:在产生数据倾斜时使用。在shuffle的情况下,数据可能出现倾斜,通过sample()算子获取一定数量的数据样本观察其中的某些key值是否重复的比例过高,如果反复取sample对应的key都有重复值较多的情况,就说明该key可能引起数据倾斜,需要加以处理。
distinct()函数签名
defdistinct()(implicitord:Ordering[T]=null):RDD[T]defdistinct(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]
函数说明
将数据集中数据去重。
objectRDD_Transform_distinct{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子distinctvalrdd=(List(1,2,3,4,1,2,3,4))valdistinctRDD=()(println)//Step3:关闭环境()}}输出结果:
1234
Scala集合的distinct方法,其中通过HashSet的方法识别重复值。
defaultReprdistinct(){booleanisImmutable=;if((1)=0){();}else{Builderb=();HashSetseen=newHashSet();Iteratorit=();booleandifferent=false;}RDD的去重方式,主要看partitioner部分,由于partitioner默认为null,因此主要逻辑体现在下述代码行:
case_=map(x=(x,null)).reduceByKey((x,_)=x,numPartitions).map(_._1)
以数组List(1,2,3,4,1,2,3,4)为例:
map(x=(x,null)):将数组变为:(1,null),(2,null),(3,null),(4,null),(1,null),(2,null),(3,null),(4,null)的tuple数组;
reduceByKey((x,_)=x,numPartitions):对tuple数组做聚合(1,null),(1,null),聚合后,key不变,(null,null),取第一个值,即null,所以聚合后的结果:(1,null),(2,null),(3,null),(4,null);
map(_._1):只保留tuple中的第一个元素,(1,null),(2,null),(3,null),(4,null)变回1,2,3,4。
distinct()算子的完整源码如下:
defdistinct(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{defremoveDuplicatesInPartition(partition:Iterator[T]):Iterator[T]={//=newExternalAppOnlyMap[T,Null,Null](createCombiner=_=null,mergeValue=(a,b)=a,mergeCombiners=(a,b)=a)((_-null))(_._1)}partitionermatch{caseSome(_)ifnumPartitions===mapPartitions(removeDuplicatesInPartition,preservesPartitioning=true)case_=map(x=(x,null)).reduceByKey((x,_)=x,numPartitions).map(_._1)}}coalesce()函数签名
defcoalesce(numPartitions:Int,shuffle:Boolean=false,partitionCoalescer:Option[PartitionCoalescer]=)(implicitord:Ordering[T]=null):RDD[T]
函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本。
objectRDD_Transform_coalesce{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子distinctvalrdd=(List(1,2,3,4),numSlices=4)valcoalesceRDD=(2)("output")//Step3:关闭环境()}}生成的分区文件:
[1,2][3,4]
如果原始数据与分区数据调整成下述的方式,数组有6个元素,变化前3个分区,变化后2个分区。
objectRDD_Transform_coalesce{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子coalescevalrdd=(List(1,2,3,4,5,6),numSlices=3)valcoalesceRDD=(2)("output")//Step3:关闭环境()}}默认情况下,coalesce()算子不打乱原始分区,如下图所示,coalesce()采用的是整个分区合并的方式。
可以通过指定shuffle参数的方式,显式让shuffle执行。
valcoalesceRDD=(2,shuffle=true)
指定shuffle参数后,分区随机进行打散:
[1,2][3,4][5,6]=[1,4,5][2,3,6]
如果需要扩展分区,可以使用coalesce()进行分区扩展,如果不指定shuffle=true,将无效,只有指定shuffle=true才能实现分区扩大。
repartition()为让coalesce()功能更明确,只作为分区缩减,专门增加repartition()算子,进行分区扩展。
函数签名
defrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]
函数说明
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
查看repartion()源代码,发现底层就是调用coalesce()算子,并默认指定shuffle=ture。
defrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{coalesce(numPartitions,shuffle=true)}sortBy()根据指定规则对数据进行排序。
函数签名
defsortBy[K](f:(T)=K,ascing:Boolean=true,numPartitions:Int=)(implicitord:Ordering[K],ctag:ClassTag[K]):RDD[T]
函数说明
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程。
objectRDD_Transform_sortBy{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子sortByvalrdd=(List(6,2,4,5,3,1),numSlices=2)valsortRDD=(num=num)().foreach(println)//Step3:关闭环境()}}数据的顺序:[6,2,4][5,3,1]=[1,2,3][4,5,6]
从结果知:sortBy()算子要进行shuffle。
应用场景,对tuple进行排序。
objectRDD_Transform_sortBy_tuple{defmain(args:Array[String]):Unit={//Step1:准备环境valsparkConf=newSparkConf().setMaster("local[*]").setAppName("Operator")valsc=newSparkContext(sparkConf)//Step2:算子sortByvalrdd=(List(("1",1),("11",2),("2",3)),numSlices=2)valsortRDD=(t=t._1)().foreach(println)//Step3:关闭环境()}}默认会按照字典序进行排序:
(1,1)(11,2)(2,3)
若不想按字典序排列,可以进行数据类型转换:
valsortRDD=(t=t._1.toInt)
输出结果:
(1,1)(2,3)(11,2)
sortBy()默认为升序,第二个方式可以改变排序方式:
valsortRDD=(t=t._1,ascing=false)
输出结果,按字典序降序:
(2,3)(11,2)(1,1)