一、Spark安装模式

local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程
standalone(集群模式):典型的Mater/slave模式,不过也能看出Master是有单点故障的;Spark支持ZooKeeper来实现 HA
on yarn(集群模式): 运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负责任务调度和计算
on mesos(集群模式): 运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算
on cloud(集群模式):比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3;Spark 支持多种分布式存储系统:HDFS 和 S3

1.安装:

(1)spark standalone模式:

  需要 hadoop 的HDFS作为持久层,jdk1.6以上。
  安装 hadoop 集群请参考:hadoop-2.6.0安装

(2)安装scala(三台都要安装):

[hadoop@h40 ~]$ tar -zxvf scala-2.10.6.tgz
[hadoop@h41 ~]$ tar -zxvf scala-2.10.6.tgz
[hadoop@h42 ~]$ tar -zxvf scala-2.10.6.tgz

(3)安装spark:

[hadoop@h40 ~]$ tar -zxvf spark-1.3.1-bin-hadoop2.6.tgz
[hadoop@h40 ~]$ vi .bash_profile 
export SPARK_HOME=/home/hadoop/spark-1.3.1-bin-hadoop2.6
export SCALA_HOME=/home/hadoop/scala-2.10.6
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SCALA_HOME/bin
[hadoop@h40 ~]$ source .bash_profile

(4)配置spark的 configuration文件:

[hadoop@h40 ~]$ cd spark-1.1.0/conf
[hadoop@h40 conf]$ cp spark-env.sh.template spark-env.sh
[hadoop@h40 conf]$ vi spark-env.sh
# 添加:
export JAVA_HOME=/usr/jdk1.7.0_25/
export SPARK_MASTER_IP=h40
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
#在spark-1.6.0-bin-hadoop2.6和spark-1.5.0-cdh5.5.2版本中为export SPARK_EXECUTOR_INSTANCES=1
export SPARK_WORKER_MEMORY=1g

(5)配置slaves:

[hadoop@h40 conf]$ vi slaves 
h41
h42

(6)同步到其他节点:

[hadoop@h40 ~]$ scp -r spark-1.1.0 h41:/home/hadoop
[hadoop@h40 ~]$ scp -r spark-1.1.0 h42:/home/hadoop

(7)启动spark:

[hadoop@h40 spark-1.3.1-bin-hadoop2.6]$ sbin/start-all.sh

(8)验证

[hadoop@h40 ~]$ jps
# 主节点有 master进程
8861 Master

[hadoop@h41 ~]$ jps
[hadoop@h42 ~]$ jps
# 从节点有 Worker进程
8993 Worker

(9)查看 Spark 版本:通过Spark Shell 执行 spark.version

# spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-cdh6.3.2
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.version
res0: String = 2.4.0-cdh6.3.2

二、Spark开发调试算子代码

参考:
RDD基本操作(上)
RDD基本操作(下)

  spark本地开发调试非常简单,本地开发调试不需要任何已经装好的spark系统,我们只需要建立一个项目,这个项目可以是java的也可以是scala,然后我们将spark-assembly-1.6.1-hadoop2.6.0.jar这样的jar放入项目的环境里,这个时候我们就可以在本地开发调试spark程序(Scala)了:

package cn.com.sparktest
 
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
 
object SparkTest {
  val conf:SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]")
  val sc:SparkContext = new SparkContext(conf)
   
  /**
   * 创建数据的方式--从内存里构造数据(基础)
   */
  def createDataMethod():Unit = {
    /* 使用makeRDD创建RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println("===================createDataMethod:makeRDD:List=====================")
    println(r01.collect().mkString(","))
    println("===================createDataMethod:makeRDD:List=====================")
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println("===================createDataMethod:makeRDD:Array=====================")
    println(r02.collect().mkString(","))
    println("===================createDataMethod:makeRDD:Array=====================")
     
    /* 使用parallelize创建RDD */
    /* List */
    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println("===================createDataMethod:parallelize:List=====================")
    println(r03.collect().mkString(","))
    println("===================createDataMethod:parallelize:List=====================")
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println("===================createDataMethod:parallelize:Array=====================")
    println(r04.collect().mkString(","))
    println("===================createDataMethod:parallelize:Array=====================")
  }
   
  /**
   * 创建Pair Map
   */
  def createPairRDD():Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("key01",1),("key02",2),("key03",3)))
    val r:RDD[String] = rdd.keys
    println("===========================createPairRDD=================================")
    println(r.collect().mkString(","))
    println("===========================createPairRDD=================================")
  }
   
  /**
   * 通过文件创建RDD
   * 文件数据:
   *    key01,1,2.3
          key02,5,3.7
      key03,23,4.8
      key04,12,3.9
      key05,7,1.3
   */
  def createDataFromFile(path:String):Unit = {
    val rdd:RDD[String] = sc.textFile(path, 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println("=========================createDataFromFile==================================")
    println(r.collect().mkString(","))
    println("=========================createDataFromFile==================================")
  }
   
  /**
   * 基本的RDD操作
   */
  def basicTransformRDD(path:String):Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)
     
    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
 
    /* map操作 */
    println("======map操作======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操作======")
    /* filter操作 */
    println("======filter操作======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操作======")
    /* flatMap操作 */
    println("======flatMap操作======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操作======")
    /* distinct去重操作 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操作 */
    println("======union操作======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操作======")
    /* intersection操作 */
    println("======intersection操作======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操作======")
    /* subtract操作 */
    println("======subtract操作======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操作======")
    /* cartesian操作 */
    println("======cartesian操作======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操作======")   
  }
   
  /**
   * 基本的RDD行动操作
   */
  def basicActionRDD():Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
     
    /* count操作 */
    println("======count操作======")
    println(rddInt.count())
    println("======count操作======")  
    /* countByValue操作 */
    println("======countByValue操作======")
    println(rddInt.countByValue())
    println("======countByValue操作======")
    /* reduce操作 */
    println("======countByValue操作======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操作======")
    /* fold操作 */
    println("======fold操作======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操作======")
    /* aggregate操作 */
    println("======aggregate操作======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操作======")
    /* foeach操作 */
    println("======foeach操作======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操作======")   
  }
   
  def main(args: Array[String]): Unit = {
    println(System.getenv("HADOOP_HOME"))
    createDataMethod()
    createPairRDD()
    createDataFromFile("file:///D:/sparkdata.txt")
    basicTransformRDD("file:///D:/sparkdata.txt")
    basicActionRDD()
    /*打印结果*/
    /*D://hadoop
===================createDataMethod:makeRDD:List=====================
1,4,9,16,25,36
===================createDataMethod:makeRDD:List=====================
===================createDataMethod:makeRDD:Array=====================
1,2,3,4
===================createDataMethod:makeRDD:Array=====================
===================createDataMethod:parallelize:List=====================
2,3,4,5,6,7
===================createDataMethod:parallelize:List=====================
===================createDataMethod:parallelize:Array=====================
4,5,6
===================createDataMethod:parallelize:Array=====================
===========================createPairRDD=================================
key01,key02,key03
===========================createPairRDD=================================
key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3
=========================createDataFromFile==================================
2,3,4,5,6,7,3,6,2
======map操作======
======filter操作======
5,6,5
======filter操作======
======flatMap操作======
key01
======flatMap操作======
======distinct去重======
4,6,2,1,3,5
======distinct去重======
======union操作======
1,3,5,3,2,4,5,1
======union操作======
======intersection操作======
1,5
======intersection操作======
======subtract操作======
3,3
======subtract操作======
======cartesian操作======
(1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1)
======cartesian操作======
======count操作======
9
======count操作======
======countByValue操作======
Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)
======countByValue操作======
======countByValue操作======
29
======countByValue操作======
======fold操作======
29
======fold操作======
======aggregate操作======
19,10
======aggregate操作======
======foeach操作======
a
b
c
d
b
a
======foeach操作======*/
  }
}

上面的代码是把RDD当作一个数组,下面的代码主要讲解RDD是如何处理Map的数据格式:

package cn.com.sparktest
 
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.CompactBuffer
 
object SparkPairMap {
   
  val conf:SparkConf = new SparkConf().setAppName("spark pair map").setMaster("local[2]")
  val sc:SparkContext = new SparkContext(conf)
  
  /**
   * 构建Pair RDD
   */
  def createPairMap():Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
    val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
    println("=========createPairMap=========")
    println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
    println("=========createPairMap=========")
     
    /*
     * 测试文件数据:
     * x01,1,4
             x02,11,1
             x01,3,9
             x01,2,6
       x02,18,12
       x03,7,9
     *
     * */
    val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) }
    val rFile:RDD[String] = rddFile.keys
    println("=========createPairMap File=========")
    println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03
    println("=========createPairMap File=========")
  }
   
  /**
   * 关于Pair RDD的转化操作和行动操作
   */
  def pairMapRDD(path:String):Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
    val other:RDD[(String,Int)] = sc.parallelize(List(("k01",29)), 1)
     
    // 转化操作
    val rddReduce:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
    println("====reduceByKey===:" + rddReduce.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
    val rddGroup:RDD[(String,Iterable[Int])] = rdd.groupByKey()
    println("====groupByKey===:" + rddGroup.collect().mkString(","))// (k01,CompactBuffer(3, 26)),(k03,CompactBuffer(2)),(k02,CompactBuffer(6))
    val rddKeys:RDD[String] = rdd.keys
    println("====keys=====:" + rddKeys.collect().mkString(","))// k01,k02,k03,k01
    val rddVals:RDD[Int] = rdd.values
    println("======values===:" + rddVals.collect().mkString(","))// 3,6,2,26
    val rddSortAsc:RDD[(String,Int)] = rdd.sortByKey(true, 1)
    val rddSortDes:RDD[(String,Int)] = rdd.sortByKey(false, 1)
    println("====rddSortAsc=====:" + rddSortAsc.collect().mkString(","))// (k01,3),(k01,26),(k02,6),(k03,2)
    println("======rddSortDes=====:" + rddSortDes.collect().mkString(","))// (k03,2),(k02,6),(k01,3),(k01,26)
    val rddFmVal:RDD[(String,Int)] = rdd.flatMapValues { x => List(x + 10) }
    println("====flatMapValues===:" + rddFmVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)
    val rddMapVal:RDD[(String,Int)] = rdd.mapValues { x => x + 10 }
    println("====mapValues====:" + rddMapVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)
    val rddCombine:RDD[(String,(Int,Int))] = rdd.combineByKey(x => (x,1), (param:(Int,Int),x) => (param._1 + x,param._2 + 1), (p1:(Int,Int),p2:(Int,Int)) => (p1._1 + p2._1,p1._2 + p2._2))
    println("====combineByKey====:" + rddCombine.collect().mkString(","))//(k01,(29,2)),(k03,(2,1)),(k02,(6,1))
    val rddSubtract:RDD[(String,Int)] = rdd.subtractByKey(other);
    println("====subtractByKey====:" + rddSubtract.collect().mkString(","))// (k03,2),(k02,6)
    val rddJoin:RDD[(String,(Int,Int))] = rdd.join(other)
    println("=====rddJoin====:" + rddJoin.collect().mkString(","))// (k01,(3,29)),(k01,(26,29))
    val rddRight:RDD[(String,(Option[Int],Int))] = rdd.rightOuterJoin(other)
    println("====rightOuterJoin=====:" + rddRight.collect().mkString(","))// (k01,(Some(3),29)),(k01,(Some(26),29))
    val rddLeft:RDD[(String,(Int,Option[Int]))] = rdd.leftOuterJoin(other)
    println("=====rddLeft=====:" + rddLeft.collect().mkString(","))// (k01,(3,Some(29))),(k01,(26,Some(29))),(k03,(2,None)),(k02,(6,None))
    val rddCogroup: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd.cogroup(other)
    println("=====cogroup=====:" + rddCogroup.collect().mkString(","))// (k01,(CompactBuffer(3, 26),CompactBuffer(29))),(k03,(CompactBuffer(2),CompactBuffer())),(k02,(CompactBuffer(6),CompactBuffer()))
     
    // 行动操作
    val resCountByKey = rdd.countByKey()
    println("=====countByKey=====:" + resCountByKey)// Map(k01 -> 2, k03 -> 1, k02 -> 1)
    val resColMap = rdd.collectAsMap()
    println("=====resColMap=====:" + resColMap)//Map(k02 -> 6, k01 -> 26, k03 -> 2)
    val resLookup = rdd.lookup("k01")
    println("====lookup===:" + resLookup) // WrappedArray(3, 26)
  }
   
  /**
   * 其他一些不常用的RDD操作
   */
  def otherRDDOperate(){
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
     
    println("=====first=====:" + rdd.first())//(k01,3)
    val resTop = rdd.top(2).map(x => x._1 + ";" + x._2)
    println("=====top=====:" + resTop.mkString(","))// k03;2,k02;6
    val resTake = rdd.take(2).map(x => x._1 + ";" + x._2)
    println("=======take====:" + resTake.mkString(","))// k01;3,k02;6
    val resTakeSample = rdd.takeSample(false, 2).map(x => x._1 + ";" + x._2)
    println("=====takeSample====:" + resTakeSample.mkString(","))// k01;26,k03;2
    val resSample1 = rdd.sample(false, 0.25)
    val resSample2 = rdd.sample(false, 0.75)
    val resSample3 = rdd.sample(false, 0.5)
    println("=====sample======:" + resSample1.collect().mkString(","))// 无
    println("=====sample======:" + resSample2.collect().mkString(","))// (k01,3),(k02,6),(k01,26)
    println("=====sample======:" + resSample3.collect().mkString(","))// (k01,3),(k01,26)
  }
   
  def main(args: Array[String]): Unit = {
    createPairMap()
    pairMapRDD("file:///F:/sparkdata01.txt")
    otherRDDOperate()
  }
   
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐