写入hdfs需要添加两条语句

  1. System.setProperty(“HADOOP_USER_NAME”,“root”)
  2. res.partitionBy(new myPartitioner(keyArray)).saveAsTextFile(“hdfs://host01:9000/wordCount”)
  1. 在桌面创建一个文件,内容如下

    hello world java hadoop java spark hadoop SPARK HADOOP
    JAVA LINUX SPARK JAVA HELLO HBASE CENTOS LINUX hbase
    Spark Java Hello Spark Scala Hadoop MapReduce mapreduce
    123 456 789 123 12
    
    1. 统计每一个单词出现的次数,不要统计数字的次数,忽略大小写(spark、Spark、SPARK是同一个单词,最终的结果统计中以纯小写字母进行统计)

    2. 将最终的统计结果输出到HDFS,按照出现的次数降序排列,如果次数相同,按照字符串字典顺序升序排序,a-g开头的存一个文件,h-n开头的存一个文件,o-t开头的存一个文件,u-z开头的存一个文件,其他的存一个文件

      package day06
      
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{Partitioner, SparkConf, SparkContext}
      
      import scala.collection.mutable
      import scala.collection.mutable.ArrayBuffer
      
      /**
       * 2.1
       * 统计每一个单词出现的次数,不要统计数字的次数,
       * 忽略大小写(spark、Spark、SPARK是同一个单词,最终的结果统计中以纯小写字母进行统计)
       *
       * 将最终的统计结果输出到HDFS,按照出现的次数降序排列,如果次数相同,按照字符串字典顺序升序排序,
       * a-g开头的存一个文件,h-n开头的存一个文件,o-t开头的存一个文件,u-z开头的存一个文件,其他的存一个文件
       *
       */
      
      object Test2 {
          def main(args: Array[String]): Unit = {
      
              val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("test2"))
      
              val rdd1: RDD[String] = sc.textFile("C:\\Users\\18870\\OneDrive\\桌面\\TestFile\\question2.txt",5)
      
              // 1.处理每一行数据,每一行数据都封装成List[(String, Int)]类型
              val rdd2: RDD[ArrayBuffer[(String, Int)]] = rdd1.map(line => {
                  val info: Array[String] = line.split(" ")
                  val buffer: ArrayBuffer[(String, Int)] = mutable.ArrayBuffer[(String, Int)]()
                  //正则过滤掉数字
                  val regex: String = "[0-9]+"
      
                  for (elem <- info) {
                      //如果elem不是一个数字
                      if(!elem.matches(regex))
                          buffer += ((elem.toLowerCase(), 1))
                  }
                  buffer
              })
      
              // 2.扁平化rdd2
              val rdd3: RDD[(String, Int)] = rdd2.flatMap(_.iterator)
      
              val res: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)
      
              res.foreach(println)
      
              val keyArray: Array[String] = res.keys.collect()
      
              res.partitionBy(new myPartitioner(keyArray)).saveAsTextFile("C:\\Users\\18870\\OneDrive\\桌面\\TestFile\\question2Output")
      
      //        res.partitionBy(new myPartitioner(keyArray)).saveAsTextFile("hdfs://host01:9000/wordCount")
      
          }
      }
      
      class myPartitioner extends Partitioner{
      
          private val _partitionMap: mutable.Map[String, Int] = new mutable.HashMap[String, Int] {}
          private var index: Int = 0
          def this(array: Array[String]){
              this()
              for (elem <- array) {
                  if (elem.head > 'a' && elem.head < 'g')
                      index = 0
                  else if (elem.head < 'n')
                      index = 1
                  else if (elem.head < 't')
                      index = 2
                  else if (elem.head < 'z')
                      index = 3
                  else
                      index = 4
                  _partitionMap.put(elem, index)
              }
          }
      
          override def numPartitions: Int = 5
      
          override def getPartition(key: Any): Int = _partitionMap.getOrElse(key.toString,4)
      
      }

另一种写法

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

/**
 * 1. 统计每一个单词出现的次数,不要统计数字的次数,忽略大小写(spark、Spark、SPARK是同一个单词,最终的结果统计中以纯小写字母进行统计)
 * 2. 将最终的统计结果输出到HDFS,按照出现的次数降序排列,如果次数相同,按照字符串字典顺序升序排序,a-g开头的存一个文件,h-n开头的存一个文件,o-t开头的存一个文件,u-z开头的存一个文件,其他的存一个文件
 *
 * saveAsTextFile,会创建一个文件夹,如果这个文件夹存在,会抛异常
 * 添加小逻辑: 判断这个文件夹是否存在,如果存在,删除原来的文件夹
 *
 */
object MainApplication {

    // 最终的结果输出位置,存入HDFS
    private val path: String = "hdfs://qianfeng01:8020/exam02/out02"

    def main(args: Array[String]): Unit = {
        hadoopConfiguration()
        // 1. 实例化SparkContext
        val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("wc"))
        // 2. 读取文件
        val rdd: RDD[String] = sc.textFile("C:\\Users\\luds\\Desktop\\SZ2002-Scala\\src\\main\\scala\\exam02_sparkCore\\exam2\\file")
        // 3. 处理逻辑
        rdd.flatMap(_.split("[ \t]+"))                  // 切割出每一个单词,得到 RDD[String]
            .filter(!_.matches("\\d+"))                 // 过滤掉纯数字的单词
            .map(word => (word.toLowerCase(), 1))                // 将单词转小写,与数字1组成键值对,得到RDD[(String, Int)]
            .reduceByKey(_+_)                                    // 将单词出现的次数累加到一起
            .sortBy(t => (-t._2, t._1))                          // 排序
            .partitionBy(new WordPartitioner)                    // 对数据重新分区
            .saveAsTextFile(path)                                // 输出到HDFS
    }

    // 判断HDFS上,是否有指定的文件夹存在,如果存在,则删除
    def hadoopConfiguration(): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
        val fs: FileSystem = {
            val configuration: Configuration = new Configuration()
            configuration.set("fs.defaultFS", "hdfs://host01:8020")
            FileSystem.get(configuration)
        }
        val p: Path = new Path(path)
        // 判断是否存在
        if (fs.exists(p)) {
            fs.delete(p, true)
        }
    }
}

class WordPartitioner extends Partitioner {
    override def numPartitions: Int = 5

    override def getPartition(key: Any): Int = {
        val firstLetter: Char = key.toString.charAt(0)
        if (firstLetter > 'z' || firstLetter < 'a') 4
        else if (firstLetter <= 'g') 0
        else if (firstLetter <= 'n') 1
        else if (firstLetter <= 't') 2
        else 3
    }
}

  1. 已知文件access.txt中的内容如下
    access.txt
20161123101526	http://java.learn.com/java/javaee.shtml
20161123101526	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101526	http://java.learn.com/java/javaee.shtml
20161123101526	http://bigdata.learn.com/bigdata/course.shtml
20161123101526	http://bigdata.learn.com/bigdata/video.shtml
20161123101526	http://java.learn.com/java/teacher.shtml
20161123101526	http://ui.learn.com/ui/video.shtml
20161123101526	http://bigdata.learn.com/bigdata/video.shtml
20161123101526	http://ui.learn.com/ui/video.shtml
20161123101526	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101526	http://bigdata.learn.com/bigdata/course.shtml
20161123101526	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101526	http://ui.learn.com/ui/teacher.shtml
20161123101526	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101526	http://ui.learn.com/ui/course.shtml
20161123101526	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101526	http://ui.learn.com/ui/course.shtml
20161123101526	http://bigdata.learn.com/bigdata/video.shtml
20161123101526	http://ui.learn.com/ui/course.shtml
20161123101527	http://h5.learn.com/h5/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/course.shtml
20161123101527	http://java.learn.com/java/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101527	http://java.learn.com/java/teacher.shtml
20161123101527	http://ui.learn.com/ui/course.shtml
20161123101527	http://bigdata.learn.com/bigdata/video.shtml
20161123101527	http://java.learn.com/java/teacher.shtml
20161123101527	http://bigdata.learn.com/bigdata/course.shtml
20161123101527	http://ui.learn.com/ui/teacher.shtml
20161123101527	http://ui.learn.com/ui/video.shtml
20161123101527	http://java.learn.com/java/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/course.shtml
20161123101527	http://bigdata.learn.com/bigdata/video.shtml
20161123101527	http://ui.learn.com/ui/video.shtml
20161123101527	http://ui.learn.com/ui/course.shtml
20161123101527	http://java.learn.com/java/teacher.shtml
20161123101527	http://h5.learn.com/h5/course.shtml
20161123101527	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101527	http://h5.learn.com/h5/video.shtml
20161123101527	http://ui.learn.com/ui/teacher.shtml
20161123101527	http://java.learn.com/java/teacher.shtml
20161123101527	http://ui.learn.com/ui/video.shtml
20161123101527	http://java.learn.com/java/javaee.shtml
20161123101527	http://bigdata.learn.com/bigdata/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/course.shtml
20161123101527	http://ui.learn.com/ui/course.shtml
20161123101527	http://h5.learn.com/h5/course.shtml
20161123101527	http://java.learn.com/java/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101527	http://ui.learn.com/ui/teacher.shtml
20161123101527	http://bigdata.learn.com/bigdata/course.shtml
20161123101527	http://java.learn.com/java/teacher.shtml
20161123101527	http://h5.learn.com/h5/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101527	http://java.learn.com/java/teacher.shtml
20161123101527	http://bigdata.learn.com/bigdata/course.shtml
20161123101527	http://ui.learn.com/ui/video.shtml
20161123101527	http://ui.learn.com/ui/teacher.shtml
20161123101527	http://java.learn.com/java/javaee.shtml
20161123101527	http://bigdata.learn.com/bigdata/video.shtml
20161123101527	http://bigdata.learn.com/bigdata/course.shtml
20161123101527	http://java.learn.com/java/javaee.shtml
20161123101527	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101527	http://ui.learn.com/ui/video.shtml
20161123101527	http://h5.learn.com/h5/course.shtml
20161123101527	http://bigdata.learn.com/bigdata/teacher.shtml
20161123101527	http://java.learn.com/java/teacher.shtml
20161123101527	http://bigdata.learn.com/bigdata/course.shtml
20161123101527	http://bigdata.learn.com/bigdata/video.shtml
  20161123101523 http://java.learn.com/java/javaee.shtml
  说明: 
  第一列为访问的时间, 2016年11月23日10时15分23秒
  第二列为访问的网址
  1. 读取这个文件,统计每一秒的学科访问量Top3
  2. 按照时间存储到不同的文件中
   package day06
   
   import java.net.URL
   
   import org.apache.spark.{Partitioner, SparkConf, SparkContext}
   import org.apache.spark.rdd.RDD
   
   import scala.collection.mutable
   
   /**
    * 20161123101523 http://java.learn.com/java/javaee.shtml
    * 说明:
    * 第一列为访问的时间, 2016年11月23日10时15分23秒
    * 第二列为访问的网址
    *
    * 1. 读取这个文件,统计每一秒的学科访问量Top3
    * 2. 按照时间存储到不同的文件中
    */
   object Test3 {
       def main(args: Array[String]): Unit = {
           val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("test3"))
   
           val rdd1: RDD[String] = sc.textFile("C:\\Users\\18870\\OneDrive\\桌面\\TestFile\\access.txt")
   
           val rdd2: RDD[((String, String), Int)] = rdd1.map(line => {
               val info: Array[String] = line.split("\t")
               val url: URL = new URL(info(1))
               val host: String = url.getHost
               ((info(0), host), 1)
           })
           // 1.统计每秒的访问量
           val rdd3: RDD[((String, String), Int)] = rdd2.reduceByKey(_ + _)
           // 重新组合元素
           val rdd4: RDD[(String, (String, Int))] = rdd3.map(t => (t._1._1, (t._1._2, t._2)))
           // 按照时间分组
           val rdd5: RDD[(String, Iterable[(String, Int)])] = rdd4.groupByKey()
           // 排序,取前3
           val res: RDD[(String, List[(String, Int)])] = rdd5.mapValues(t => t.toList.sortWith(_._2 > _._2).take(3))
   
           res.foreach(println)
   
           val keyArray: Array[String] = res.keys.collect()
   
           res.partitionBy(new MyPartitioner(keyArray)).saveAsTextFile("C:\\Users\\18870\\OneDrive\\桌面\\TestFile\\subjectOutput")
       }
   }
   
   
   class MyPartitioner extends Partitioner{
   
       private val _map: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()
       // 下标从0开始,否则会报错
       def this(array: Array[String]){
           this()
           var count: Int = -1
           for (elem <- array) {
               count += 1
               this._map.put(elem, count)
           }
       }
   
       override def numPartitions: Int = _map.size
   
       override def getPartition(key: Any): Int = {
   
           _map.getOrElse(key.toString, 0)
       }
   }

另一种写法

import java.net.URL

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

/**
 * 1. 读取这个文件,统计每一秒的学科访问量Top3
 * 2. 按照时间存储到不同的文件中
 *
 */
object MainApplication extends App {
    // 1. 创建SparkContext对象
    private val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("exam03")
    // conf.set("spark.shuffle.manager", "hash")

    private val sc: SparkContext = new SparkContext(conf)


    // 2. 读取文件
    private val rdd: RDD[String] = sc.textFile("src\\main\\scala\\exam02_sparkCore\\exam3\\access.txt")
    // 3.
    private val groupedRDD: RDD[(String, List[(String, Int)])] = rdd.map(line => {
        val parts: Array[String] = line.split("\t")
        val host: String = new URL(parts(1)).getHost
        ((parts(0), host), 1)
    }) // 解析每一行的数据,得到 ((时间, 学科), 1)
        .reduceByKey(_ + _) // 计算每一秒,每一个学科的总访问量
        .map(t => (t._1._1, (t._1._2, t._2))) // 重新规划数据的格式
        .groupByKey() // 按照时间分组 (时间, Iterable[(学科, 访问量)])
        .mapValues(_.toList.sortWith(_._2 > _._2).take(3)).cache()  // 将value按照访问量降序,取前3

    // 4. 获取所有的分区,进而得到一个指定的分区器
    private val partitioner: TimePartitioner = new TimePartitioner(groupedRDD.keys.collect())

    // 5. 对数据重新分区
    groupedRDD.partitionBy(partitioner).saveAsTextFile("C:\\Users\\luds\\Desktop\\out3")
}

class TimePartitioner(timeArray: Array[String]) extends Partitioner {
    override def numPartitions: Int = timeArray.length
    override def getPartition(key: Any): Int = timeArray.indexOf(key)
}

  1. 已知文件test.txt中的内容如下

``
Get Spark from the downloads page of the project website This documentation is for Spark version Spark uses Hadoop s client libraries for HDFS and YARN.Downloads are pre packaged for a handful of popular Hadoop versions Users can also download a Hadoop free binary and run Spark with any Hadoop version by augmenting Spark s classpath Scala and Java users can include Spark in their projects using its Maven coordinates and in the future Python users can also install Spark from PyPI


   1. 统计单词Spark出现的次数
   2. 哪个单词出现的次数最多

 ```scala
   package day06
   
   import org.apache.spark.{SparkConf, SparkContext}
   import org.apache.spark.rdd.RDD
   
   import scala.collection.mutable
   import scala.collection.mutable.ArrayBuffer
   
   /**
    * 1. 统计单词Spark出现的次数
    * 2. 哪个单词出现的次数最多
    */
   
   object Test4 {
   
       def main(args: Array[String]): Unit = {
   
           val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local").setAppName("test4"))
   
           val rdd1: RDD[String] = sc.textFile("C:\\Users\\18870\\OneDrive\\桌面\\TestFile\\question4.txt")
   
           val rdd2: RDD[ArrayBuffer[(String, Int)]] = rdd1.map(line => {
               // 将每一行按照非字母进行切割
               val info: Array[String] = line.split("\\W")
               val buffer: ArrayBuffer[(String, Int)] = mutable.ArrayBuffer[(String, Int)]()
   
               // 切出来有空字符串,我也不知道为什么
               // 反正过滤掉就完事了
               // 利用一个mutable.ArrayBuffer[(String, Int)]装起来
               for (elem <- info if !elem.isEmpty) {
                   buffer += ((elem, 1))
               }
               // 返回ArrayBuffer[(String, Int)]
               buffer
           })
           // 扁平化rdd2
           val rdd3: RDD[(String, Int)] = rdd2.flatMap(_.iterator)
           val rdd4: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)
   
           // 1.统计Spark出现的次数
           val res1: RDD[(String, Int)] = rdd4.filter(_._1.toLowerCase().equals("spark"))
   
           res1.foreach(println)
           // 2.哪个单词出现的次数最多
           val rdd5: RDD[(String, Int)] = rdd4.sortBy(_._2, false)
           val res2: Array[(String, Int)] = rdd5.take(1)
           res2.foreach(println)
       }
   
   }
   

另一种写法

import org.apache.spark.{SparkConf, SparkContext}

/**
 * 1. 统计单词Spark出现的次数
 * 2. 哪个单词出现的次数最多
 *
 */
object MainApplication {
    def main(args: Array[String]): Unit = {
        val sc: SparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("exam03"))

        // 最终得到的是存储了所有的单词次数的数组
        val times: Array[(String, Int)] = sc.textFile("src\\main\\scala\\exam02_sparkCore\\exam4\\file")
            .flatMap(_.split("\\W+"))
            .map((_, 1))
            .reduceByKey(_ + _)
            .sortBy(-_._2)
            .collect()

        // 输出出现次数最多的单词
        println(s"出现次数最多: ${times(0)._1}")
        // 输出Spark出现的次数
        println(s"Spark出现的次数是: ${times.find(_._1.equals("Spark")).get._2}")

    }
}

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐