Scala创建RDD的多种方式
并行化创建RDD sc.parallelize()先将hdfs和spark的master,worker启动,然后在bin目录下执行spark-shell --master spark://linux01:7077调用scala的driver端scala> val arr = Array(1,2,3,4,5)//先创建一个数组arr: Array[Int] = Array(1, 2, 3, 4
·
并行化创建RDD sc.parallelize()
先将hdfs和spark的master,worker启动,然后在bin目录下执行spark-shell --master spark://linux01:7077
调用scala的driver端
scala> val arr = Array(1,2,3,4,5) //先创建一个数组
arr: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(arr) //创建一个RDD,这个rdd代表后面要对这个数组进行操作
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.partitions.length //看看这个rdd有几个分区
res0: Int = 8 //因为一开始没有指定分区,所以默认是total-executor-cores的数量,因为一共就这些核,启动这么多的task是最快的
scala> spark-shell --master spark://linux01:7077 --total-executor-cores 5 //重启给他五个核 或者 scala> val add = sc.parallelize(arr,5)
scala> val rdd2 = rdd.map(_*100) //没有生成job
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:25
scala> rdd2.collect //将结果收集回来,这是一个action,生成了job
res2: Array[Int] = Array(100, 200, 300, 400, 500)
RDD有多个分区,分区的数量决定了它的并行度,并行度越高,task越多
textFile创建RDD分区
hdfs的相关目录下要有数据文件
分区的数量由hdfs中数据的输入切片数量(逻辑切片)决定 切块完成后,如果某文件的大小除以分区数后的大小超过最小文件的1.1倍,则增加切片数量
scala> val rdd1 = sc.textFile("hdfs://linux01:8020/wc2")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://linux01:8020/wc2 MapPartitionsRDD[9] at textFile at <console>:24
scala> rdd1.partitions.length //获取分区数量
res6: Int = 3 //分区的数量由hdfs中数据的输入切片数量(逻辑切片)决定
更多推荐
已为社区贡献1条内容
所有评论(0)