并行化创建RDD sc.parallelize()

先将hdfs和spark的master,worker启动,然后在bin目录下执行spark-shell --master spark://linux01:7077

调用scala的driver端

scala> val arr = Array(1,2,3,4,5)     //先创建一个数组
arr: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(arr)  //创建一个RDD,这个rdd代表后面要对这个数组进行操作
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.partitions.length         //看看这个rdd有几个分区
res0: Int = 8                        //因为一开始没有指定分区,所以默认是total-executor-cores的数量,因为一共就这些核,启动这么多的task是最快的

scala> spark-shell --master spark://linux01:7077 --total-executor-cores 5    //重启给他五个核  或者 scala> val add = sc.parallelize(arr,5)
scala> val rdd2 = rdd.map(_*100)    //没有生成job
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:25

scala> rdd2.collect                 //将结果收集回来,这是一个action,生成了job
res2: Array[Int] = Array(100, 200, 300, 400, 500) 

RDD有多个分区,分区的数量决定了它的并行度,并行度越高,task越多

textFile创建RDD分区

hdfs的相关目录下要有数据文件

分区的数量由hdfs中数据的输入切片数量(逻辑切片)决定 切块完成后,如果某文件的大小除以分区数后的大小超过最小文件的1.1倍,则增加切片数量

scala> val rdd1 = sc.textFile("hdfs://linux01:8020/wc2")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://linux01:8020/wc2 MapPartitionsRDD[9] at textFile at <console>:24

scala> rdd1.partitions.length        //获取分区数量
res6: Int = 3                        //分区的数量由hdfs中数据的输入切片数量(逻辑切片)决定


 

 

 

 

 

 

 

 

 

 

 

 

 

 

Logo

更多推荐