转换算子之map和distinct算子

任务描述

本关任务:输出每个元素及其长度并去重。

相关知识

为了完成本关任务,你需要掌握map算子以及distinct算子的的用法。

map 算子

map(func):将函数应用于RDD中的每个元素,将返回值构成新的RDD,示例如下:

  1. val list=List(1,2,3)
  2. val rdd=sc.parallelize(list)
  3. val rdd1=rdd.map(x=>x+1)
  4. rdd1.foreach(println)

结果: 2 3 4

说明:rdd.map(x=>x+1)表示将rdd中的每个元素x1得到新的rdd1(2,3,4)

distinct 算子

distinct():去重

  1. val list=List(1,2,3,2,3)
  2. val rdd=sc.parallelize(list)
  3. val rdd1=rdd.distinct()
  4. rdd1.foreach(println)

结果为: 1 3 2

说明:表示将rdd(1,2,3,2,3)通过rdd.distinct()对元素去重,生成新的rdd1(1,2,3)

编程要求

根据提示,在右侧编辑器begin-end处补充代码,输出每个元素及其长度并去重。

测试说明

平台会对你编写的代码进行测试:

预期输出:

(an,2) (dog,3) (cat,3)

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object EduCoder1 {

    def main(args: Array[String]): Unit = {

    val conf =new SparkConf().setAppName("educoder1").setMaster("local")

    val sc=new SparkContext(conf)

    val rdd = sc.parallelize(List("dog","an","cat","an","cat"))

    /********** Begin **********/

    //第一步:通过获取rdd中每个元素的长度创建新的rdd1

    val rdd1 = sc.parallelize(List(3,2,3,2,3))

    //第二步:通过zip把rdd1和rdd组合创建rdd2

    val rdd2=rdd.zip(rdd1)

    //第三步:去重

    val rdd3=rdd2.distinct()

    //第四步:输出结果

    rdd3.foreach(println)

    /********** End **********/

    sc.stop()

  }

}

转换算子之flatMap和filter算子

任务描述

本关任务:输出个数大于一的单词。

相关知识

为了完成本关任务,你需要掌握filter算子以及flatMap算子。

flatMap 算子

flatMap(func):将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,示例如下:

  1. val list=List(1,2,3)
  2. val rdd=sc.parallelize(list)
  3. val rdd1=rdd.flatMap(x=>x.to(3))
  4. rdd1.foreach(println)

结果为: 1 2 3 2 3 3

说明:flatMapMap的区别,map函数会对每一条输入进行指定操作,然后为每一条输入返回一个对象;而flatmap函数则是两个操作的集合,最后将所有对象合并为一个对象

filter 算子

filter(func):筛选出满足函数的元素,并返回一个新的RDD,示例如下:

  1. val list=List(1,2,3)
  2. val rdd=sc.parallelize(list)
  3. val rdd1=rdd.map(x=>x+1)
  4. val rdd2=rdd1.filter(x=>x>2)
  5. rdd2.foreach(println)

结果: 3 4

编程要求

根据提示,在右侧编辑器begin-end处补充代码,输出个数大于一的单词。

测试说明

平台会对你编写的代码进行测试:

所给文件内容如下:

  1. hello,world,hello,spark
  2. good,nice,good,do

预期输出: (hello,2) (good,2)

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object EduCoder2 {

    def main(args: Array[String]): Unit = {

    val conf =new SparkConf().setAppName("educoder2").setMaster("local")

    val sc=new SparkContext(conf)

    val rdd=sc.textFile("file:///root/step3_fils")

    /********** Begin **********/

    //第一步:对所给数据创建的rdd切割分词

    val rdd1=sc.parallelize(rdd.flatMap(line=>line.split(",")).collect)

    //第二步:每个单词计数为1

    val rdd2= rdd1.map(line=>(line,1))

    //第三步:对相同单词个数进行累加

    val rdd3=rdd2.reduceByKey(_+_)

    //第四步:过滤出单词个数大于一个的

    val rdd4= rdd3.filter(line=>line._2>1)

    //第五步:输出结果

    rdd4.foreach(println)

    /********** End **********/

    sc.stop()

  }

}

转换算子之reduceBykey和mapValues算子

任务描述

本关任务:求出这两本书这一天销售的平均价格。

相关知识

为了完成本关任务,你需要掌握reduceByKey算子和mapValues算子的使用。

reduceByKey算子

reduceByKey(func) :应用于(k,v)键值对RDD,对相同keyvalue进行运算,示例如下:

  1. val list=List((3,2),(1,4),(2,5),(2,8))
  2. val rdd=sc.parallelize(list)
  3. val rdd2= rdd.reduceByKey(_+_)
  4. rdd2.foreach(print)

结果为:

  1. (1,4)
  2. (3,2)
  3. (2,13)

说明:rdd.reduceByKey(_+_)表示将相同keyvalue进行累加,传入两个参数(例:58)返回一个值(13)。

mapValues算子

mapValues(func) :应用于(k,v)键值对RDD,对RDD中的value进行map操作,而不对key进行处理。

  1. val list=List((3,2),(1,4),(2,5),(2,8))
  2. val rdd=sc.parallelize(list)
  3. val rdd2= rdd.reduceByKey(_+_)
  4. val rdd3=rdd2.mapValues(x=>x*2)
  5. rdd3.foreach(print)

结果为:

  1. (1,8)
  2. (3,4)
  3. (2,26)
编程要求

根据提示,在右侧编辑器begin-end处补充代码,某商店上午卖出10本 spark 书籍,每本50元,4本 Hadoop 书籍,每本40元,下午卖出20本 spark 书籍,每本40元,10本 Hadoop 书籍,每本30元。

现要求求出这两本书这一天销售的平均价格。

数据如下:

  1. spark,10,50
  2. spark,40,25
  3. hadoop,5,40
  4. hadoop,10,25
测试说明

平台会对你编写的代码进行测试:

预期输出:

(spark,30) (hadoop,30)

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object EduCoder3 {

    def main(args: Array[String]): Unit = {

    val conf =new SparkConf().setAppName("educoder3").setMaster("local")

    val sc=new SparkContext(conf)

    /********** Begin **********/

    //第一步:通过给定数据通过集合并行化创建rdd

    val rdd = sc.parallelize(List(("spark",(10,50)),("hadoop",(5,40)),("hadoop",(10,25)),("spark",(40,25))))

    //第二步:求出不同书籍一天收入总和以及出售本数

    val rdd2 = rdd.reduceByKey((x,y) => ((x._1*x._2)+(y._1*y._2), x._1+y._1))

    //第三步:求出每本平均售价

    val rdd3 = rdd2.mapValues(x => x._1 / x._2)

    //输出结果

    rdd3.foreach(println)

    /********** End **********/

    sc.stop

}

}

转化算子之groupByKey和sortByKey

任务描述

本关任务:对每人所学书籍本数分组并排序输出。

相关知识

为了完成本关任务,你需要掌握groupByKey算子和sortByKey算子的使用。

groupByKey算子

groupByKey:应用于(k, v)键值对RDD,对具有相同keyvalue值进行分组。

示例如下:

  1. val list=List((3,2),(2,5),(2,8))
  2. val rdd=sc.parallelize(list)
  3. val rdd1=rdd.groupByKey()
  4. rdd1.foreach(println)

结果:

  1. (3,CompactBuffer(2))
  2. (2,CompactBuffer(5, 8))

说明:如果您分组是为了对每个键执行聚合(如求和或求平均值),使用reduceByKeyaggregateByKey会产生更好的性能。

sortByKey算子

sortByKey() :应用于(k, v)键值对 RDD,返回一个新的根据key排序的 RDD,示例如下:

  1. val list=List((3,2),(2,5),(2,8))
  2. val rdd=sc.parallelize(list)
  3. val rdd1=rdd.groupByKey()
  4. val rdd2= rdd1.sortByKey()
  5. rdd2.foreach(println)

结果:

  1. (2,CompactBuffer(5, 8))
  2. (3,CompactBuffer(2))
编程要求

根据提示,在右侧编辑器begin-end处补充代码,对每人所学书籍本数分组并排序输出。

测试说明

平台会对你编写的代码进行测试:

所给数据说明:("Bob","spark") Bob:人名 spark:所学书籍

预期输出:

  1. (Bob,3)
  2. (Candy,1)
  3. (Lily,1)

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object EduCoder4 {

    def main(args: Array[String]): Unit = {

    val conf =new SparkConf().setAppName("educoder4").setMaster("local")

    val sc=new SparkContext(conf)

   val rdd = sc.parallelize(List(("Bob","spark"),("Lily","hadoop"),("Candy","hive"),("Bob","hbase"),("Bob","hive")))

    /********** Begin **********/

     //第一步:根据姓名对所学书籍分组

    val rdd1= rdd.groupByKey()

    //第二步:求出每个人的书籍本数

    val rdd2= rdd1.mapValues(t=>t.toList.size)

    //第三步:根据项目排序

    val rdd3= rdd2.sortByKey()

    //第四步:输出结果

    rdd3.foreach(println)

    /********** End **********/

    sc.stop()

  }

}

常见行动算子

任务描述

本关任务:根据所学完成本关任务。

相关知识

为了完成本关任务,你需要掌握:常见行动算子的使用

count算子

count() :返回RDD中元素的个数。

  1. val list=List(3,4,5,8)
  2. val rdd=sc.parallelize(list)
  3. val r1=rdd.count();
  4. println(r1)

结果为: 4

take算子
  • ** take(n)** :以数组的形式返回RDD中的前n个元素。
  1. val list=List(3,4,5,8)
  2. val rdd=sc.parallelize(list)
  3. val rdd1= rdd.take(3)
  4. rdd1.foreach(println)

结果为: 3 4 5

  • reduce(func) :通过函数func(输入两个参数并返回一个值)聚合 RDD 中的元素。

    1. val list=List(3,4,5,8)
    2. val rdd=sc.parallelize(list)
    3. val rdd1=rdd.reduce(_*_)
    4. println(rdd1)

    结果为: 480

collect算子

collect() :以数组的形式返回 RDD 中的所有元素,收集分布在各个worker的数据到driver节点。

  1. val list=List(3,4,5,8)
  2. val rdd=sc.parallelize(list)
  3. val rdd1= rdd.collect()
  4. rdd1.foreach(println)

结果为: 3 4 5 8

saveAsTextFile算子

saveAsTextFile:将数据输出,存储到指定目录。

  1. val list=List(3,4,5,8)
  2. val rdd=sc.parallelize(list)
  3. rdd.saveAsTextFile("C:\\Users\\admin\\Desktop\\test")
foreach算子

foreach(func) :将RDD中的每个元素传递到函数func中运行,与map的区别是无返回值。

编程要求

根据提示,在右侧编辑器begin-end处补充代码输出正确答案。

测试说明

预期输出:

  1. 4
  2. dog
  3. sun
  4. an
  5. dogsunancat
  6. dog
  7. sun
  8. an
  9. cat

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object EduCoder5 {

    def main(args: Array[String]): Unit = {

    val conf =new SparkConf().setAppName("educoder5").setMaster("local")

    val sc=new SparkContext(conf)

    val rdd = sc.parallelize(List("dog","sun","an","cat"))

  /********** Begin **********/

  //第一步:返回所给rdd的元素个数并输出

     val r1=rdd.count()

     println(r1)

    //第二步:返回rdd的前三个元素并输出

    val rdd1=rdd.take(3)

    rdd1.foreach(println)

    //第三步:累加rdd的所有元素并输出结果

    val r2=rdd.reduce(_+_)

    println(r2)  

    //第四步:收集所有元素并且输出

    rdd.collect().foreach(println)  

  /********** End **********/

    sc.stop()

  }

}

算子的综合使用案例

任务描述

本关任务:编写Spark独立应用程序实现所给案例的需求。

相关知识

为了完成本关任务,你需要掌握:常见算子的使用。

编程要求

有一份数据格式如下的文档:

日期,姓名,app,下载渠道,地区,版本号

  1. 2017-08-14,Lily,Facebook,360 Shop,NewYork,v1.0
  2. 2017-08-14,Bob,Facebook,Amazon Appstore,NewYork,v1.2
  3. 2017-08-14,Lily,Facebook,360 Shop,Washington,v1.2
  4. 2017-08-14,Lily,Facebook,Google Play Store,Washington,v2.0
  5. 2017-08-14,Candy,YouTube,app store,Chicago,v1.8
  6. 2017-08-14,Lily,Facebook,Google Play Store,Washington,v2.0
  7. 2017-08-14,Candy,YouTube,app store,Chicago,v1.9
  8. 2017-08-15,Candy,YouTube,app store,Chicago,v2.0
  9. 2017-08-15,Candy,YouTube,app store,Chicago,v2.3
  10. 2017-08-15,Lily,Facebook,360 Shop,NewYork,v2.0
  11. 2017-08-15,Bob,Facebook,Amazon Appstore,NewYork,v1.2
  12. 2017-08-15,Bob,Facebook,Amazon Appstore,NewYork,v1.5
  13. 2017-08-15,Candy,YouTube,app store,Chicago,v2.9

需求: 不考虑地区,列出版本升级情况。

结果格式: 日期,姓名,app,下载渠道,升级前版本,升级后版本。

例: 数据:

  1. 2017-08-14,Lily,Facebook,360 Shop,NewYork,v1.0
  2. 2017-08-14,Lily,Facebook,360 Shop,Washington,v1.2
  3. 2017-08-14,Lily,Facebook,360 Shop,NewYork,v2.0

结果:

  1. (2017-08-14,Lily,Facebook,360 Shop,v1.0,v1.2)
  2. (2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0)

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object EduCoder {

  def main(args: Array[String]): Unit = {

    val conf =new SparkConf().setAppName("educoder").setMaster("local")

    val sc=new SparkContext(conf)

    val line=sc.textFile("file:///root/step1_fils")

  /********** Begin **********/

    //根据需求,去除城市字段

    val rdd1 = line.map(t => {

    val arr = t.split(",")

    ((arr(0), arr(1), arr(2), arr(3)), arr(5))

    })

    //按key分组,key是除城市字段和版本号字段以外的所有字段,value是版本号

    val rdd2=rdd1.groupByKey()

    //过滤版本号重复的(例:(v2.0,v2.0))以及版本号只有一个的(例(v1.0))

    val rdd3=rdd2.mapValues(t=>t.toList.distinct).filter(t=>t._2.length>1)

    //拆分重新组合(例:(key,(v2.0,v2.5,v3.0))拆分成(key,(v2.0,v2.5))(key,(v2.5,v3.0)))

    val rdd4= rdd3.mapValues(t => {

    val tai = t.tail

    t.zip(tai)

    })

    //按需求整理输出格式(例:(2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0))

    val rdd5= rdd4.flatMap(t => {

    t._2.map(tp => {

    (t._1._1, t._1._2, t._1._3, t._1._4, tp._1, tp._2)

    })

    })

    //执行foreach操作,打印出结果

    rdd5.foreach(println)

  /********** End **********/

    sc.stop()

  }

}

Logo

欢迎加入西安开发者社区!我们致力于为西安地区的开发者提供学习、合作和成长的机会。参与我们的活动,与专家分享最新技术趋势,解决挑战,探索创新。加入我们,共同打造技术社区!

更多推荐