Spark-1.3.1-bin-hadoop2.6安装及RDD基本操作
spark 安装模式:local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程standalone(集群模式):典型的Mater/slave模式,不过也能看出Master是有单点故障的;Spark支持ZooKeeper来实现 HAon yarn(集群模式): 运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负
一、Spark安装模式
local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程
standalone(集群模式):典型的Mater/slave模式,不过也能看出Master是有单点故障的;Spark支持ZooKeeper来实现 HA
on yarn(集群模式): 运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负责任务调度和计算
on mesos(集群模式): 运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算
on cloud(集群模式):比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3;Spark 支持多种分布式存储系统:HDFS 和 S3
1.安装:
(1)spark standalone模式:
需要 hadoop 的HDFS作为持久层,jdk1.6以上。
安装 hadoop 集群请参考:hadoop-2.6.0安装
(2)安装scala(三台都要安装):
[hadoop@h40 ~]$ tar -zxvf scala-2.10.6.tgz
[hadoop@h41 ~]$ tar -zxvf scala-2.10.6.tgz
[hadoop@h42 ~]$ tar -zxvf scala-2.10.6.tgz
(3)安装spark:
[hadoop@h40 ~]$ tar -zxvf spark-1.3.1-bin-hadoop2.6.tgz
[hadoop@h40 ~]$ vi .bash_profile
export SPARK_HOME=/home/hadoop/spark-1.3.1-bin-hadoop2.6
export SCALA_HOME=/home/hadoop/scala-2.10.6
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SCALA_HOME/bin
[hadoop@h40 ~]$ source .bash_profile
(4)配置spark的 configuration文件:
[hadoop@h40 ~]$ cd spark-1.1.0/conf
[hadoop@h40 conf]$ cp spark-env.sh.template spark-env.sh
[hadoop@h40 conf]$ vi spark-env.sh
# 添加:
export JAVA_HOME=/usr/jdk1.7.0_25/
export SPARK_MASTER_IP=h40
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
#在spark-1.6.0-bin-hadoop2.6和spark-1.5.0-cdh5.5.2版本中为export SPARK_EXECUTOR_INSTANCES=1
export SPARK_WORKER_MEMORY=1g
(5)配置slaves:
[hadoop@h40 conf]$ vi slaves
h41
h42
(6)同步到其他节点:
[hadoop@h40 ~]$ scp -r spark-1.1.0 h41:/home/hadoop
[hadoop@h40 ~]$ scp -r spark-1.1.0 h42:/home/hadoop
(7)启动spark:
[hadoop@h40 spark-1.3.1-bin-hadoop2.6]$ sbin/start-all.sh
(8)验证
[hadoop@h40 ~]$ jps
# 主节点有 master进程
8861 Master
[hadoop@h41 ~]$ jps
[hadoop@h42 ~]$ jps
# 从节点有 Worker进程
8993 Worker
(9)查看 Spark 版本:通过Spark Shell 执行 spark.version
# spark-shell
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0-cdh6.3.2
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.version
res0: String = 2.4.0-cdh6.3.2
二、Spark开发调试算子代码
参考:
RDD基本操作(上)
RDD基本操作(下)
spark本地开发调试非常简单,本地开发调试不需要任何已经装好的spark系统,我们只需要建立一个项目,这个项目可以是java的也可以是scala,然后我们将spark-assembly-1.6.1-hadoop2.6.0.jar这样的jar放入项目的环境里,这个时候我们就可以在本地开发调试spark程序(Scala)了:
package cn.com.sparktest
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object SparkTest {
val conf:SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]")
val sc:SparkContext = new SparkContext(conf)
/**
* 创建数据的方式--从内存里构造数据(基础)
*/
def createDataMethod():Unit = {
/* 使用makeRDD创建RDD */
/* List */
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
val r01 = rdd01.map { x => x * x }
println("===================createDataMethod:makeRDD:List=====================")
println(r01.collect().mkString(","))
println("===================createDataMethod:makeRDD:List=====================")
/* Array */
val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
val r02 = rdd02.filter { x => x < 5}
println("===================createDataMethod:makeRDD:Array=====================")
println(r02.collect().mkString(","))
println("===================createDataMethod:makeRDD:Array=====================")
/* 使用parallelize创建RDD */
/* List */
val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r03 = rdd03.map { x => x + 1 }
println("===================createDataMethod:parallelize:List=====================")
println(r03.collect().mkString(","))
println("===================createDataMethod:parallelize:List=====================")
/* Array */
val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r04 = rdd04.filter { x => x > 3 }
println("===================createDataMethod:parallelize:Array=====================")
println(r04.collect().mkString(","))
println("===================createDataMethod:parallelize:Array=====================")
}
/**
* 创建Pair Map
*/
def createPairRDD():Unit = {
val rdd:RDD[(String,Int)] = sc.makeRDD(List(("key01",1),("key02",2),("key03",3)))
val r:RDD[String] = rdd.keys
println("===========================createPairRDD=================================")
println(r.collect().mkString(","))
println("===========================createPairRDD=================================")
}
/**
* 通过文件创建RDD
* 文件数据:
* key01,1,2.3
key02,5,3.7
key03,23,4.8
key04,12,3.9
key05,7,1.3
*/
def createDataFromFile(path:String):Unit = {
val rdd:RDD[String] = sc.textFile(path, 1)
val r:RDD[String] = rdd.flatMap { x => x.split(",") }
println("=========================createDataFromFile==================================")
println(r.collect().mkString(","))
println("=========================createDataFromFile==================================")
}
/**
* 基本的RDD操作
*/
def basicTransformRDD(path:String):Unit = {
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
/* map操作 */
println("======map操作======")
println(rddInt.map(x => x + 1).collect().mkString(","))
println("======map操作======")
/* filter操作 */
println("======filter操作======")
println(rddInt.filter(x => x > 4).collect().mkString(","))
println("======filter操作======")
/* flatMap操作 */
println("======flatMap操作======")
println(rddFile.flatMap { x => x.split(",") }.first())
println("======flatMap操作======")
/* distinct去重操作 */
println("======distinct去重======")
println(rddInt.distinct().collect().mkString(","))
println(rddStr.distinct().collect().mkString(","))
println("======distinct去重======")
/* union操作 */
println("======union操作======")
println(rdd01.union(rdd02).collect().mkString(","))
println("======union操作======")
/* intersection操作 */
println("======intersection操作======")
println(rdd01.intersection(rdd02).collect().mkString(","))
println("======intersection操作======")
/* subtract操作 */
println("======subtract操作======")
println(rdd01.subtract(rdd02).collect().mkString(","))
println("======subtract操作======")
/* cartesian操作 */
println("======cartesian操作======")
println(rdd01.cartesian(rdd02).collect().mkString(","))
println("======cartesian操作======")
}
/**
* 基本的RDD行动操作
*/
def basicActionRDD():Unit = {
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
/* count操作 */
println("======count操作======")
println(rddInt.count())
println("======count操作======")
/* countByValue操作 */
println("======countByValue操作======")
println(rddInt.countByValue())
println("======countByValue操作======")
/* reduce操作 */
println("======countByValue操作======")
println(rddInt.reduce((x ,y) => x + y))
println("======countByValue操作======")
/* fold操作 */
println("======fold操作======")
println(rddInt.fold(0)((x ,y) => x + y))
println("======fold操作======")
/* aggregate操作 */
println("======aggregate操作======")
val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
println(res._1 + "," + res._2)
println("======aggregate操作======")
/* foeach操作 */
println("======foeach操作======")
println(rddStr.foreach { x => println(x) })
println("======foeach操作======")
}
def main(args: Array[String]): Unit = {
println(System.getenv("HADOOP_HOME"))
createDataMethod()
createPairRDD()
createDataFromFile("file:///D:/sparkdata.txt")
basicTransformRDD("file:///D:/sparkdata.txt")
basicActionRDD()
/*打印结果*/
/*D://hadoop
===================createDataMethod:makeRDD:List=====================
1,4,9,16,25,36
===================createDataMethod:makeRDD:List=====================
===================createDataMethod:makeRDD:Array=====================
1,2,3,4
===================createDataMethod:makeRDD:Array=====================
===================createDataMethod:parallelize:List=====================
2,3,4,5,6,7
===================createDataMethod:parallelize:List=====================
===================createDataMethod:parallelize:Array=====================
4,5,6
===================createDataMethod:parallelize:Array=====================
===========================createPairRDD=================================
key01,key02,key03
===========================createPairRDD=================================
key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3
=========================createDataFromFile==================================
2,3,4,5,6,7,3,6,2
======map操作======
======filter操作======
5,6,5
======filter操作======
======flatMap操作======
key01
======flatMap操作======
======distinct去重======
4,6,2,1,3,5
======distinct去重======
======union操作======
1,3,5,3,2,4,5,1
======union操作======
======intersection操作======
1,5
======intersection操作======
======subtract操作======
3,3
======subtract操作======
======cartesian操作======
(1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1)
======cartesian操作======
======count操作======
9
======count操作======
======countByValue操作======
Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)
======countByValue操作======
======countByValue操作======
29
======countByValue操作======
======fold操作======
29
======fold操作======
======aggregate操作======
19,10
======aggregate操作======
======foeach操作======
a
b
c
d
b
a
======foeach操作======*/
}
}
上面的代码是把RDD当作一个数组,下面的代码主要讲解RDD是如何处理Map的数据格式:
package cn.com.sparktest
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.CompactBuffer
object SparkPairMap {
val conf:SparkConf = new SparkConf().setAppName("spark pair map").setMaster("local[2]")
val sc:SparkContext = new SparkContext(conf)
/**
* 构建Pair RDD
*/
def createPairMap():Unit = {
val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
println("=========createPairMap=========")
println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
println("=========createPairMap=========")
/*
* 测试文件数据:
* x01,1,4
x02,11,1
x01,3,9
x01,2,6
x02,18,12
x03,7,9
*
* */
val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) }
val rFile:RDD[String] = rddFile.keys
println("=========createPairMap File=========")
println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03
println("=========createPairMap File=========")
}
/**
* 关于Pair RDD的转化操作和行动操作
*/
def pairMapRDD(path:String):Unit = {
val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
val other:RDD[(String,Int)] = sc.parallelize(List(("k01",29)), 1)
// 转化操作
val rddReduce:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
println("====reduceByKey===:" + rddReduce.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
val rddGroup:RDD[(String,Iterable[Int])] = rdd.groupByKey()
println("====groupByKey===:" + rddGroup.collect().mkString(","))// (k01,CompactBuffer(3, 26)),(k03,CompactBuffer(2)),(k02,CompactBuffer(6))
val rddKeys:RDD[String] = rdd.keys
println("====keys=====:" + rddKeys.collect().mkString(","))// k01,k02,k03,k01
val rddVals:RDD[Int] = rdd.values
println("======values===:" + rddVals.collect().mkString(","))// 3,6,2,26
val rddSortAsc:RDD[(String,Int)] = rdd.sortByKey(true, 1)
val rddSortDes:RDD[(String,Int)] = rdd.sortByKey(false, 1)
println("====rddSortAsc=====:" + rddSortAsc.collect().mkString(","))// (k01,3),(k01,26),(k02,6),(k03,2)
println("======rddSortDes=====:" + rddSortDes.collect().mkString(","))// (k03,2),(k02,6),(k01,3),(k01,26)
val rddFmVal:RDD[(String,Int)] = rdd.flatMapValues { x => List(x + 10) }
println("====flatMapValues===:" + rddFmVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)
val rddMapVal:RDD[(String,Int)] = rdd.mapValues { x => x + 10 }
println("====mapValues====:" + rddMapVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)
val rddCombine:RDD[(String,(Int,Int))] = rdd.combineByKey(x => (x,1), (param:(Int,Int),x) => (param._1 + x,param._2 + 1), (p1:(Int,Int),p2:(Int,Int)) => (p1._1 + p2._1,p1._2 + p2._2))
println("====combineByKey====:" + rddCombine.collect().mkString(","))//(k01,(29,2)),(k03,(2,1)),(k02,(6,1))
val rddSubtract:RDD[(String,Int)] = rdd.subtractByKey(other);
println("====subtractByKey====:" + rddSubtract.collect().mkString(","))// (k03,2),(k02,6)
val rddJoin:RDD[(String,(Int,Int))] = rdd.join(other)
println("=====rddJoin====:" + rddJoin.collect().mkString(","))// (k01,(3,29)),(k01,(26,29))
val rddRight:RDD[(String,(Option[Int],Int))] = rdd.rightOuterJoin(other)
println("====rightOuterJoin=====:" + rddRight.collect().mkString(","))// (k01,(Some(3),29)),(k01,(Some(26),29))
val rddLeft:RDD[(String,(Int,Option[Int]))] = rdd.leftOuterJoin(other)
println("=====rddLeft=====:" + rddLeft.collect().mkString(","))// (k01,(3,Some(29))),(k01,(26,Some(29))),(k03,(2,None)),(k02,(6,None))
val rddCogroup: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd.cogroup(other)
println("=====cogroup=====:" + rddCogroup.collect().mkString(","))// (k01,(CompactBuffer(3, 26),CompactBuffer(29))),(k03,(CompactBuffer(2),CompactBuffer())),(k02,(CompactBuffer(6),CompactBuffer()))
// 行动操作
val resCountByKey = rdd.countByKey()
println("=====countByKey=====:" + resCountByKey)// Map(k01 -> 2, k03 -> 1, k02 -> 1)
val resColMap = rdd.collectAsMap()
println("=====resColMap=====:" + resColMap)//Map(k02 -> 6, k01 -> 26, k03 -> 2)
val resLookup = rdd.lookup("k01")
println("====lookup===:" + resLookup) // WrappedArray(3, 26)
}
/**
* 其他一些不常用的RDD操作
*/
def otherRDDOperate(){
val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
println("=====first=====:" + rdd.first())//(k01,3)
val resTop = rdd.top(2).map(x => x._1 + ";" + x._2)
println("=====top=====:" + resTop.mkString(","))// k03;2,k02;6
val resTake = rdd.take(2).map(x => x._1 + ";" + x._2)
println("=======take====:" + resTake.mkString(","))// k01;3,k02;6
val resTakeSample = rdd.takeSample(false, 2).map(x => x._1 + ";" + x._2)
println("=====takeSample====:" + resTakeSample.mkString(","))// k01;26,k03;2
val resSample1 = rdd.sample(false, 0.25)
val resSample2 = rdd.sample(false, 0.75)
val resSample3 = rdd.sample(false, 0.5)
println("=====sample======:" + resSample1.collect().mkString(","))// 无
println("=====sample======:" + resSample2.collect().mkString(","))// (k01,3),(k02,6),(k01,26)
println("=====sample======:" + resSample3.collect().mkString(","))// (k01,3),(k01,26)
}
def main(args: Array[String]): Unit = {
createPairMap()
pairMapRDD("file:///F:/sparkdata01.txt")
otherRDDOperate()
}
}
更多推荐
所有评论(0)