最近看到有几个Github友关注了Streaming的监控工程——Teddy,所以思来想去还是优化下代码,不能让别人看笑话,是不。于是就想改在一下之前最丑陋的一个地方——任务提交

本博客内容基于Spark2.2版本~在阅读文章并想实际操作前,请确保你有:

一台配置好Spark和yarn的服务器

支持正常spark-submit --master yarn xxxx的任务提交

老版本

老版本任务提交是采用启动本地进程,执行脚本spark-submit xxx的方式做的。其中一个关键的问题就是获得提交Spark任务的Application-id,因为这个id是跟任务状态的跟踪有关系的。如果你的资源管理框架用的是yarn,应该知道每个运行的任务都有一个applicaiton_id,这个id的生成规则是:

appplication_时间戳_数字

原本老版本的spark通过参数spark.app.id就可以手动指定id,但是新版本的代码不是了。它是直接读取的taskBackend中的applicationId()方法,这个方法具体的实现是根据实现类来定的。在yarn中,是通过Yarn的YarnClusterSchedulerBackend实现的,具体的连接可以参考对应的链接。

感兴趣的同学可以看一下,最终真正常见applicaiton_id的地方式在hadoop-yarn里面,有个叫做ContainerId来生成的。

总结一句话就是,想要自定义id,甭想了!!!!

于是当时脑袋瓜不灵光的我,就想到那就等应用创建好了之后,直接写到数据库里面呗。怎么写呢?

我事先生成一个自定义的id,当做参数传递到spark应用里面;

等spark初始化后,就可以通过sparkContext取得对应的application_id以及url

然后再driver连接数据库,插入一条关联关系

d59e0a0ff206095f7e67a1245f77eacb.png

新版本

还是归结于互联网时代的信息传递,我看到群友的聊天,知道了SparkLauncer这个东西,调查后发现。他可以基于Java代码自动提交Spark任务,有两种模式:

new SparkLauncher().launch() 直接启动一个Process,效果跟以前一样

new SparkLauncher().startApplicaiton(监听器) 返回一个SparkAppHandler,并(可选)传入一个监听器

当然是更倾向于第二种啦,因为好处很多:

自带输出重定向(Output,Error都有,支持写到文件里面),超级爽的功能

可以自定义监听器,当信息或者状态变更时,都能进行操作(对我没啥用)

返回的SparkAppHandler支持 暂停、停止、断连、获得AppId、获得State等多种功能,我就想要这个!!!!

300de03b76b090cb36529afebef77397.png

一步一步,代码展示

首先创建一个最基本的Spark程序:

import org.apache.spark.sql.SparkSession;

import java.util.ArrayList;

import java.util.List;

public class HelloWorld {

public static void main(String[] args) throws InterruptedException {

SparkSession spark = SparkSession

.builder()

//.master("yarn")

//.appName("hello-wrold")

//.config("spark.some.config.option", "some-value")

.getOrCreate();

List persons = new ArrayList<>();

persons.add(new Person("zhangsan", 22, "male"));

persons.add(new Person("lisi", 25, "male"));

persons.add(new Person("wangwu", 23, "female"));

spark.createDataFrame(persons, Person.class).show(false);

spark.close();

}

}

然后创建SparkLauncher类:

import org.apache.spark.launcher.SparkAppHandle;

import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;

public class Launcher {

public static void main(String[] args) throws IOException {

SparkAppHandle handler = new SparkLauncher()

.setAppName("hello-world")

.setSparkHome(args[0])

.setMaster(args[1])

.setConf("spark.driver.memory", "2g")

.setConf("spark.executor.memory", "1g")

.setConf("spark.executor.cores", "3")

.setAppResource("/home/xinghailong/launcher/launcher_test.jar")

.setMainClass("HelloWorld")

.addAppArgs("I come from Launcher")

.setDeployMode("cluster")

.startApplication(new SparkAppHandle.Listener(){

@Override

public void stateChanged(SparkAppHandle handle) {

System.out.println("********** state changed **********");

}

@Override

public void infoChanged(SparkAppHandle handle) {

System.out.println("********** info changed **********");

}

});

while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){

System.out.println("id "+handler.getAppId());

System.out.println("state "+handler.getState());

try {

Thread.sleep(10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

打包完成后上传到部署Spark的服务器上。由于SparkLauncher所在的类引用了SparkLauncher,所以还需要把这个jar也上传到服务器上。

[xinghailong@hnode10 launcher]$ ls

launcher_test.jar spark-launcher_2.11-2.2.0.jar

[xinghailong@hnode10 launcher]$ pwd

/home/xinghailong/launcher

由于SparkLauncher需要指定SPARK_HOME,因此如果你的机器可以执行spark-submit,那么就看一下spark-submit里面,SPARK_HOME是在哪

[xinghailong@hnode10 launcher]$ which spark2-submit

/var/lib/hadoop-hdfs/bin/spark2-submit

最后几行就能看到:

export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark

# disable randomized hash for string in Python 3.3+

export PYTHONHASHSEED=0

exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

综上,我们需要的是:

一个自定义的Jar,里面包含spark应用和SparkLauncher类

一个SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根据你自己的来就行

一个当前目录的路径

一个SARK_HOME环境变量指定的目录

然后执行命令启动测试:

java -Djava.ext.dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn

说明:

-Djava.ext.dirs 设置当前目录为java类加载的目录

传入两个参数,一个是SPARK_HOME;一个是启动模式

观察删除发现成功启动运行了:

id null

state UNKNOWN

Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect

INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

********** state changed **********

...省略一大堆拷贝jar的日志

********** info changed **********

********** state changed **********

Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect

INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED)

... 省略一堆重定向的日志

application_1518263195995_37615 (state: ACCEPTED)

id application_1518263195995_37615

state SUBMITTED

Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect

INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING)

********** state changed **********

... 省略一堆重定向的日志

INFO: user: hdfs

********** state changed **********

Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect

INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called

Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect

INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701

总结

这样就实现了基于Java应用提交Spark任务,并获得其Appliation_id和状态进行定位跟踪的需求了。

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐