Spark算子--Scala版本
/拆分重新组合(例:(key,(v2.0,v2.5,v3.0))拆分成(key,(v2.0,v2.5))(key,(v2.5,v3.0)))//按需求整理输出格式(例:(2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0))//过滤版本号重复的(例:(v2.0,v2.0))以及版本号只有一个的(例(v1.0))//按key分组,key是除城市字段和版本号字段以外
转换算子之map和distinct算子
任务描述
本关任务:输出每个元素及其长度并去重。
相关知识
为了完成本关任务,你需要掌握map
算子以及distinct
算子的的用法。
map 算子
map(func)
:将函数应用于RDD
中的每个元素,将返回值构成新的RDD
,示例如下:
val list=List(1,2,3)
val rdd=sc.parallelize(list)
val rdd1=rdd.map(x=>x+1)
rdd1.foreach(println)
结果: 2
3
4
说明:rdd.map(x=>x+1)
表示将rdd
中的每个元素x
加1
得到新的rdd1(2,3,4)
distinct 算子
distinct()
:去重
val list=List(1,2,3,2,3)
val rdd=sc.parallelize(list)
val rdd1=rdd.distinct()
rdd1.foreach(println)
结果为: 1
3
2
说明:表示将rdd(1,2,3,2,3)
通过rdd.distinct()
对元素去重,生成新的rdd1(1,2,3)
。
编程要求
根据提示,在右侧编辑器begin-end
处补充代码,输出每个元素及其长度并去重。
测试说明
平台会对你编写的代码进行测试:
预期输出:
(an,2)
(dog,3)
(cat,3)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder1 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder1").setMaster("local")
val sc=new SparkContext(conf)
val rdd = sc.parallelize(List("dog","an","cat","an","cat"))
/********** Begin **********/
//第一步:通过获取rdd中每个元素的长度创建新的rdd1
val rdd1 = sc.parallelize(List(3,2,3,2,3))
//第二步:通过zip把rdd1和rdd组合创建rdd2
val rdd2=rdd.zip(rdd1)
//第三步:去重
val rdd3=rdd2.distinct()
//第四步:输出结果
rdd3.foreach(println)
/********** End **********/
sc.stop()
}
}
转换算子之flatMap和filter算子
任务描述
本关任务:输出个数大于一的单词。
相关知识
为了完成本关任务,你需要掌握filter
算子以及flatMap
算子。
flatMap 算子
flatMap(func)
:将函数应用于RDD
中的每个元素,将返回的迭代器的所有内容构成新的RDD
,示例如下:
val list=List(1,2,3)
val rdd=sc.parallelize(list)
val rdd1=rdd.flatMap(x=>x.to(3))
rdd1.foreach(println)
结果为: 1
2
3
2
3
3
说明:flatMap
与Map
的区别,map
函数会对每一条输入进行指定操作,然后为每一条输入返回一个对象;而flatmap
函数则是两个操作的集合,最后将所有对象合并为一个对象
filter 算子
filter(func)
:筛选出满足函数的元素,并返回一个新的RDD
,示例如下:
val list=List(1,2,3)
val rdd=sc.parallelize(list)
val rdd1=rdd.map(x=>x+1)
val rdd2=rdd1.filter(x=>x>2)
rdd2.foreach(println)
结果: 3
4
编程要求
根据提示,在右侧编辑器begin-end
处补充代码,输出个数大于一的单词。
测试说明
平台会对你编写的代码进行测试:
所给文件内容如下:
hello,world,hello,spark
good,nice,good,do
预期输出: (hello,2)
(good,2)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder2 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder2").setMaster("local")
val sc=new SparkContext(conf)
val rdd=sc.textFile("file:///root/step3_fils")
/********** Begin **********/
//第一步:对所给数据创建的rdd切割分词
val rdd1=sc.parallelize(rdd.flatMap(line=>line.split(",")).collect)
//第二步:每个单词计数为1
val rdd2= rdd1.map(line=>(line,1))
//第三步:对相同单词个数进行累加
val rdd3=rdd2.reduceByKey(_+_)
//第四步:过滤出单词个数大于一个的
val rdd4= rdd3.filter(line=>line._2>1)
//第五步:输出结果
rdd4.foreach(println)
/********** End **********/
sc.stop()
}
}
转换算子之reduceBykey和mapValues算子
任务描述
本关任务:求出这两本书这一天销售的平均价格。
相关知识
为了完成本关任务,你需要掌握reduceByKey
算子和mapValues
算子的使用。
reduceByKey算子
reduceByKey(func) :应用于(k,v)
键值对RDD
,对相同key
的value
进行运算,示例如下:
val list=List((3,2),(1,4),(2,5),(2,8))
val rdd=sc.parallelize(list)
val rdd2= rdd.reduceByKey(_+_)
rdd2.foreach(print)
结果为:
(1,4)
(3,2)
(2,13)
说明:rdd.reduceByKey(_+_)
表示将相同key
的value
进行累加,传入两个参数(例:5
和8
)返回一个值(13
)。
mapValues算子
mapValues(func) :应用于(k,v)
键值对RDD
,对RDD
中的value
进行map
操作,而不对key
进行处理。
val list=List((3,2),(1,4),(2,5),(2,8))
val rdd=sc.parallelize(list)
val rdd2= rdd.reduceByKey(_+_)
val rdd3=rdd2.mapValues(x=>x*2)
rdd3.foreach(print)
结果为:
(1,8)
(3,4)
(2,26)
编程要求
根据提示,在右侧编辑器begin-end
处补充代码,某商店上午卖出10
本 spark 书籍,每本50
元,4
本 Hadoop 书籍,每本40
元,下午卖出20
本 spark 书籍,每本40
元,10
本 Hadoop 书籍,每本30
元。
现要求求出这两本书这一天销售的平均价格。
数据如下:
spark,10,50
spark,40,25
hadoop,5,40
hadoop,10,25
测试说明
平台会对你编写的代码进行测试:
预期输出:
(spark,30)
(hadoop,30)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder3 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder3").setMaster("local")
val sc=new SparkContext(conf)
/********** Begin **********/
//第一步:通过给定数据通过集合并行化创建rdd
val rdd = sc.parallelize(List(("spark",(10,50)),("hadoop",(5,40)),("hadoop",(10,25)),("spark",(40,25))))
//第二步:求出不同书籍一天收入总和以及出售本数
val rdd2 = rdd.reduceByKey((x,y) => ((x._1*x._2)+(y._1*y._2), x._1+y._1))
//第三步:求出每本平均售价
val rdd3 = rdd2.mapValues(x => x._1 / x._2)
//输出结果
rdd3.foreach(println)
/********** End **********/
sc.stop
}
}
转化算子之groupByKey和sortByKey
任务描述
本关任务:对每人所学书籍本数分组并排序输出。
相关知识
为了完成本关任务,你需要掌握groupByKey
算子和sortByKey
算子的使用。
groupByKey算子
groupByKey:应用于(k, v)
键值对RDD
,对具有相同key
的value
值进行分组。
示例如下:
val list=List((3,2),(2,5),(2,8))
val rdd=sc.parallelize(list)
val rdd1=rdd.groupByKey()
rdd1.foreach(println)
结果:
(3,CompactBuffer(2))
(2,CompactBuffer(5, 8))
说明:如果您分组是为了对每个键执行聚合(如求和或求平均值),使用reduceByKey
或aggregateByKey
会产生更好的性能。
sortByKey算子
sortByKey() :应用于(k, v)
键值对 RDD,返回一个新的根据key
排序的 RDD,示例如下:
val list=List((3,2),(2,5),(2,8))
val rdd=sc.parallelize(list)
val rdd1=rdd.groupByKey()
val rdd2= rdd1.sortByKey()
rdd2.foreach(println)
结果:
(2,CompactBuffer(5, 8))
(3,CompactBuffer(2))
编程要求
根据提示,在右侧编辑器begin-end
处补充代码,对每人所学书籍本数分组并排序输出。
测试说明
平台会对你编写的代码进行测试:
所给数据说明:("Bob","spark")
Bob
:人名 spark
:所学书籍
预期输出:
(Bob,3)
(Candy,1)
(Lily,1)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder4 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder4").setMaster("local")
val sc=new SparkContext(conf)
val rdd = sc.parallelize(List(("Bob","spark"),("Lily","hadoop"),("Candy","hive"),("Bob","hbase"),("Bob","hive")))
/********** Begin **********/
//第一步:根据姓名对所学书籍分组
val rdd1= rdd.groupByKey()
//第二步:求出每个人的书籍本数
val rdd2= rdd1.mapValues(t=>t.toList.size)
//第三步:根据项目排序
val rdd3= rdd2.sortByKey()
//第四步:输出结果
rdd3.foreach(println)
/********** End **********/
sc.stop()
}
}
常见行动算子
任务描述
本关任务:根据所学完成本关任务。
相关知识
为了完成本关任务,你需要掌握:常见行动算子的使用
count算子
count() :返回RDD
中元素的个数。
val list=List(3,4,5,8)
val rdd=sc.parallelize(list)
val r1=rdd.count();
println(r1)
结果为: 4
take算子
- ** take(n)** :以数组的形式返回
RDD
中的前n
个元素。
val list=List(3,4,5,8)
val rdd=sc.parallelize(list)
val rdd1= rdd.take(3)
rdd1.foreach(println)
结果为: 3
4
5
-
reduce(func) :通过函数
func
(输入两个参数并返回一个值)聚合 RDD 中的元素。val list=List(3,4,5,8)
val rdd=sc.parallelize(list)
val rdd1=rdd.reduce(_*_)
println(rdd1)
结果为:
480
collect算子
collect() :以数组的形式返回 RDD 中的所有元素,收集分布在各个worker
的数据到driver
节点。
val list=List(3,4,5,8)
val rdd=sc.parallelize(list)
val rdd1= rdd.collect()
rdd1.foreach(println)
结果为: 3
4
5
8
saveAsTextFile算子
saveAsTextFile:将数据输出,存储到指定目录。
val list=List(3,4,5,8)
val rdd=sc.parallelize(list)
rdd.saveAsTextFile("C:\\Users\\admin\\Desktop\\test")
foreach算子
foreach(func) :将RDD
中的每个元素传递到函数func
中运行,与map
的区别是无返回值。
编程要求
根据提示,在右侧编辑器begin-end
处补充代码输出正确答案。
测试说明
预期输出:
4
dog
sun
an
dogsunancat
dog
sun
an
cat
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder5 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder5").setMaster("local")
val sc=new SparkContext(conf)
val rdd = sc.parallelize(List("dog","sun","an","cat"))
/********** Begin **********/
//第一步:返回所给rdd的元素个数并输出
val r1=rdd.count()
println(r1)
//第二步:返回rdd的前三个元素并输出
val rdd1=rdd.take(3)
rdd1.foreach(println)
//第三步:累加rdd的所有元素并输出结果
val r2=rdd.reduce(_+_)
println(r2)
//第四步:收集所有元素并且输出
rdd.collect().foreach(println)
/********** End **********/
sc.stop()
}
}
算子的综合使用案例
任务描述
本关任务:编写Spark
独立应用程序实现所给案例的需求。
相关知识
为了完成本关任务,你需要掌握:常见算子的使用。
编程要求
有一份数据格式如下的文档:
日期,姓名,app,下载渠道,地区,版本号
2017-08-14,Lily,Facebook,360 Shop,NewYork,v1.0
2017-08-14,Bob,Facebook,Amazon Appstore,NewYork,v1.2
2017-08-14,Lily,Facebook,360 Shop,Washington,v1.2
2017-08-14,Lily,Facebook,Google Play Store,Washington,v2.0
2017-08-14,Candy,YouTube,app store,Chicago,v1.8
2017-08-14,Lily,Facebook,Google Play Store,Washington,v2.0
2017-08-14,Candy,YouTube,app store,Chicago,v1.9
2017-08-15,Candy,YouTube,app store,Chicago,v2.0
2017-08-15,Candy,YouTube,app store,Chicago,v2.3
2017-08-15,Lily,Facebook,360 Shop,NewYork,v2.0
2017-08-15,Bob,Facebook,Amazon Appstore,NewYork,v1.2
2017-08-15,Bob,Facebook,Amazon Appstore,NewYork,v1.5
2017-08-15,Candy,YouTube,app store,Chicago,v2.9
需求: 不考虑地区,列出版本升级情况。
结果格式: 日期,姓名,app,下载渠道,升级前版本,升级后版本。
例: 数据:
2017-08-14,Lily,Facebook,360 Shop,NewYork,v1.0
2017-08-14,Lily,Facebook,360 Shop,Washington,v1.2
2017-08-14,Lily,Facebook,360 Shop,NewYork,v2.0
结果:
(2017-08-14,Lily,Facebook,360 Shop,v1.0,v1.2)
(2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder").setMaster("local")
val sc=new SparkContext(conf)
val line=sc.textFile("file:///root/step1_fils")
/********** Begin **********/
//根据需求,去除城市字段
val rdd1 = line.map(t => {
val arr = t.split(",")
((arr(0), arr(1), arr(2), arr(3)), arr(5))
})
//按key分组,key是除城市字段和版本号字段以外的所有字段,value是版本号
val rdd2=rdd1.groupByKey()
//过滤版本号重复的(例:(v2.0,v2.0))以及版本号只有一个的(例(v1.0))
val rdd3=rdd2.mapValues(t=>t.toList.distinct).filter(t=>t._2.length>1)
//拆分重新组合(例:(key,(v2.0,v2.5,v3.0))拆分成(key,(v2.0,v2.5))(key,(v2.5,v3.0)))
val rdd4= rdd3.mapValues(t => {
val tai = t.tail
t.zip(tai)
})
//按需求整理输出格式(例:(2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0))
val rdd5= rdd4.flatMap(t => {
t._2.map(tp => {
(t._1._1, t._1._2, t._1._3, t._1._4, tp._1, tp._2)
})
})
//执行foreach操作,打印出结果
rdd5.foreach(println)
/********** End **********/
sc.stop()
}
}
更多推荐
所有评论(0)