摘要:

    我们一般选择IntelliJ IDEA书写spark程序,然后打包成jar文件,放到spark集群中运行,接下来我将以WordCount为例仔细讲述Scala程序的 "创建 => 编写 => 打包 => 运行 " 这一过程。

所需工具:

1.  spark 集群(并已经配置好Standalone模式,我的spark集群是在docker中部署的,上一篇博文讲过如何搭建hadoop完全分布式

2.  IntelliJ IDEA 开发环境

          =>配置了 java_1.8

          =>配置了 scala-sdk-2.11.8

3. 浏览器

一、 创建 Scala 文件

1.  如图:我们打开Idea

2. 点击 Create New Projects

3.   选择Java ,再一直点 next

4.  输入Project name ,再点 finish

5.   右键点击项目名称(我的是WordCount),再左键点击Add Framework support ....

6.  添加Maven依赖(用于编译scala,并打包成jar),再点Ok

7.  接下来我们配置Maven,其中gropuId你可以选择这个wordcount项目所在的文件夹,这个你随意,其添加的代码段为:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>IdeaProjects</groupId>
    <artifactId>WordCount</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>
    
</project>

8.  然后点击Idea界面右方的 Maven管理器,打开之后点击 ”刷新“ 图标

注意:假如你的界面没有Maven管理器,你可以打开 View ,再点击里面的 Tool Buttons ,这样Maven管理器就出来了。

9.  刷新之后,我们会看到Maven管理器中组建了Dependencies依赖,之后我们再右键点击项目文件(我的文件是WordCount),点击Add Framework support...,添加Scala编程框架。

10.  选择Scala,并选择Scala-sdk-2.11.8 。假如你没有,点击Create创建,再点击download下载需要的Scala-sdk版本。

假如你没有scala-sdk-2.11.8 ,按如下操作

11. 添加Scala框架之后,我们就可以新建Scala 类,编写程序了,点开项目文件WordCount,再点开src文件,再点开main文件,找到java资源文件,右键此文件,新建Scala Class

12.  之后我们输入name ,并选择Object类,再点击Ok

13.  进入scala编程界面。在编写程序之前,我们还需要设置一下编译器,使其自动编译,不然后期打包的jar文件是极其不完整的。点击File ->  点击Setting -> 点击Build下的compiler ->  勾选上 Build project automatically -> 点击 apply ->  ok

二、编写程序

14.  程序编写如下:

<1.> 我们先用local模式,在idea中调试,其实假如放到spark集群上跑,就是把local模式和appname注释掉,在spark-submit提交中使用spark://Master:7077模式,请听我娓娓道来......

package IdeaProjects
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建spark配置对象
    val conf = new SparkConf()
    //设置master属性
    conf.setMaster("local")
    conf.setAppName("WordCount")
    //通过conf创建sc
    val sc = new SparkContext(conf)
    //书写单词计数程序
    val textFile = sc.textFile("hdfs://Master:9000/user/zhangqingfeng/data/test.txt")
    val wordCount = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    wordCount.foreach(println)
  }
}

注意:这里的package后的名就是你设置Maven时的groupId的名

输出结果(输出的无用信息有点多,你需要一直往下找):

注意:我这里使用的文本测试文件是在hdfs中的,因此,在你开始调试此程序之前,你需要把hadoop集群启动起来。WebUi地址是 http://Master:50070 ,(这里的Master是我的主机ip,你要根据你自己的实际情况来输入)

     <2.> spark集群模式:假如你想要放到spark集群上跑,而不是local测试,那么你只要改三个参数即可,把setMaster,setAppname这两行注释掉,把sc.textFile("hdfs://........")改为sc.textFile(args(0))   如图所示

完整代码如下,敲完之后你就别运行了,肯定报错,因为缺少必要的参数

package IdeaProjects
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建spark配置对象
    val conf = new SparkConf()
    //设置master属性
    //conf.setMaster("local")
    //conf.setAppName("WordCount")
    //通过conf创建sc
    val sc = new SparkContext(conf)
    //书写单词计数程序
    val textFile = sc.textFile(args(0))
    val wordCount = textFile.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    wordCount.foreach(println)
  }
}

三、程序打包

15.  程序打包成jar,就像是创建scala类有至少三种方法,打包也有至少两种方法,我将介绍用Maven管理器打包

点击右方的Maven管理器 -> 点击Lifecycle -> 右键点击package -> 点击第二项 run WordCount [package]

16. 打包编译结束后,我们在左边文件目录会看到一个 target 目录,点开target目录,右键点击WordCount-1.0-SNAPSHOT.jar ,选择File Path ,之后点击WordCount-1.0-SNAPSHOT.jar  ,我们会进入jar文件的文件夹,双击jar文件,打开IdeaProjects文件,会有生成的六个class文件,说明打包成功。

四、在spark集群(Standalone)中运行jar包

17.  打包之后,我们便开始启动spark集群了,因为每个人集群不一样,尽量把这个jar文件放到spark集群的共享目录下,我的是在docker中部署的spark集群,所以需要把这个jar文件拷贝到docker容器中。

<1.> 启动里spark的standalone模式后,我们用浏览器打开WebUi,地址是http://Master:8080  (注意:每个的Master地址不同,按照你自己的实际情况来输入ip地址),如图:

从图中,小伙伴也看到了,我这是cluster 模式(从第三行可以看出),待会的运行结果输出我们在控制台是看不到的,需要进入WebUi中的stdout中查看。

<2.>  之后我们进入spark的bin文件目录下,开始提交jar包,命令如下:

[root@Master spark-2.3.0-bin-hadoop2.7]# cd bin
[root@Master bin]# spark-submit --master spark://Master:7077 --name MyWordCountScala --class IdeaProjects.WordCount --executor-memory 512M /usr/local/share/WordCount-1.0-SNAPSHOT.jar hdfs://Master:9000/user/zhangqingfeng/data/test.txt

控制台不输出结果,因为从WebUi中看出是cluster模式,如图:

<3. >  我们打开WebUi,点击Completed Applications 中的刚运行的app id,查看每个worker 的 stdout,查看输出结果:

二、在spark-submit提交中会有一些错误,在WebUi中的stderr中也会发现,其原因基本是内存不足导致的,我把我的spark配置分享给大家(基本是运行内存的设置和worker工作数),至于参数调优我也在进一步学习:

<1.> 打开spark-env.sh 文件

<2.> 打开spark-defaults.conf 文件

五、 善意的提示

    使用了Idea开发spark程序,每次都要打成jar包 放到 spark集群或者本地模式,这样太麻烦。你也可以在Idea中右键直接执行,并且使用spark集群模式,不过你需要在程序中加入一个步骤:

     1.  在Idea中右键执行之前,你需要把此项目打成jar包。

     2.  把此jar包的绝对路径添加到程序中,然后再右键执行就可以了。具体你可以看我下面的代码:

   用了  sc.addJar("绝对路径"),如此以来,你就可以在Idea直接跑程序了。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区