前言

Oozie是什么呢?按官方说法:Oozie是一个基于工作流引擎的服务器,其中每个工作流的任务可以是Hadoop的Map/Reduce作业或者Pig作业等。Oozie是运行于Java servlet容器上的一个java web应用。Oozie的目的是按照DAG(有向无环图)调度一系列的Map/Reduce或者Pig任务。Oozie 工作流由hPDL(Hadoop Process Definition Language)定义(这是一种XML流程定义语言)。

可以看出,实际上,Oozie的作用类似于:当你有一系列Map/Reduce作业,但是它们之间彼此是相互依赖的关系。(即后一个任务依赖于前面任务的输出)。正常的思路是写shell脚本来完成这些事情,但是,当任务多了,依赖关系复杂的时候,写脚本是费时费力的,而且复用性也差。所以,Oozie出现了!

简介

Oozie workflows 由控制流节点(control flow nodes)和操作节点(action nodes)组成。控制流节点除了定义一个工作流何时开始和何时结束(start,end,fail等),还提供了控制工作流执行路径的机制(decision,fork,join等)。而操作节点是指能够触发一个计算任务(Computation Task)或者处理任务(Processing Task)执行的节点。Oozie提供了包括map-reduce、ssh、file system等在内的很多操作的支持。此外,如有需要,还可以自行扩展操作节点类型。
此外,Oozie工作流中,参数指定也是非常重要的。当有很多输出目录的时候,可以让一定的工作流并发执行。

工作流生命周期

在Oozie中,工作流的状态可能存在如下几种:
这里写图片描述

上述各种状态存在相应的转移(工作流程因为某些事件,可能从一个状态跳转到另一个状态),其中合法的状态转移有如下几种,如下表所示:
这里写图片描述
当明确上述给出的状态转移空间以后,可以根据实际需要更加灵活地来控制工作流Job的运行。

控制流节点(Control flow nodes)

下面workflow有时会缩写为wf。特此声明

控制流控制工作流的开始和结束,以及工作流Job的执行路径的节点,它定义了流程的开始(start节点)和结束(end节点或kill节点),同时提供了一种控制流程执行路径的机制(decision决策节点、fork分支节点、join会签节点)。通过上面提到的各种节点,我们大概应该能够知道它们在工作流中起着怎样的作用。下面,我们看一下不同节点的语法格式:

  • start节点

<workflow-app name="[WF-DEF-NAME]" xlmns="uri:oozie:workflow:0.1">
    ...
    <start to="[NODE-NAME]" />
    ...
</workflow-app>

上面start元素中的to指向第一个将要执行的工作流节点。

  • end节点

<workflow-app name="[WF-DEF-NAME]" xlmns="uri:oozie:workflow:0.1">
    ...
    <end to="[NODE-NAME]" />
    ...
</workflow-app>

达到该节点,工作流Job会变成SUCCEEDED状态,表示成功完成。需要注意的是,一个工作流定义必须只能有一个end节点。当一个workflow执行到end node时,即代表这个wf已经成功执行完成,如果有多个actions,其中一个到达了end node,则会kill掉其他actions,此时wf也被认为是成功执行完成。

  • kill节点

<workflow-app name="[WF-DEF-NAME]" xlmns="uri:oozie:workflow:0.1">
    ...
    <kill name="[NODE-NAME]" />
        <message>[MESSAGE-TO-LOG]</message>
    </kill>
    ...
</workflow-app>

kill元素的name属性,是要杀死的工作流节点的名称,message元素指定了工作流节点被杀死的备注信息。达到该节点,工作流Job会变成状态KILLED。

  • decision节点
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="[NODE-NAME]">
        <switch>
            <case to="[NODE_NAME]">[PREDICATE]</case>
            ...
            <case to="[NODE_NAME]">[PREDICATE]</case>
            <default to="[NODE_NAME]"/>
        </switch>
    </decision>
    ...
</workflow-app>

decision节点通过预定义一组条件,当工作流Job执行到该节点时,会根据其中的条件进行判断选择,满足条件的路径将被执行。decision节点通过switch…case语法来进行路径选择,只要有满足条件的判断,就会执行对应的路径,如果没有可以配置default元素指向的节点。

  • fork节点和join节点(重要
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <fork name="[FORK-NODE-NAME]">
        <path start="[NODE-NAME_1]" />
        ...
        <path start="[NODE-NAME_N]" />
    </fork>
    ...
    <join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />
    ...
</workflow-app>

fork和join必须一起使用。fork元素下面会有多个path元素,指定了可以并发执行的多个执行路径。fork中多个并发执行路径会在join节点的位置会合:只有所有的路径都到达后,才会继续执行join节点to属性指向的节点。
也就是说,从NODE-NAME_1 直到 NODE-NAME_N ,如果执行成功后,ok元素应该指向一个相同的join作业。以下面为例:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <fork name="forking">
        <path start="firstparalleljob"/>
        <path start="secondparalleljob"/>
    </fork>
    <action name="firstparallejob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <job-xml>job1.xml</job-xml>
        </map-reduce>
        <!-- 若firstparallejob任务执行成功,则指向任务"joining"-->
        <ok to="joining"/>
        <error to="kill"/>
    </action>
    <action name="secondparalleljob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <job-xml>job2.xml</job-xml>
        </map-reduce>
        <!-- 若secondparalleljob任务执行成功,则指向任务"joining"-->
        <ok to="joining"/>
        <error to="kill"/>
    </action>
    <!-- 当firstparallejob和secondparalleljob都运行完毕,汇集到join元素的时候,join元素执行to指向的nextaction操作-->
    <join name="joining" to="nextaction"/>
    ...
</workflow-app>

oozie 在提交前会检查fork是否有效或者是否引导一个正确的行为执行,如果校验不通过,则不会提交任务。
但是如果你确定你定义的fork join行为是对的,可以通过设置job.properties文件中的 oozie.wf.validate.ForkJoin 为 false(只影响自己)或者oozie-site.xml文件中的oozie.validate.ForkJoin 为 false(全局的,影响所有wf) 来关闭fork join检查
fork join检查会判断这两个文件的配置,仅当都为true时才会开启检查,其中一个为false,都会关闭检查。

操作节点(Action nodes)

操作节点如简介中介绍,这里展示一下操作节点中的基本特性:

  • 远程执行
    对Oozie来说,操作节点的执行都是远程的,因为Oozie可能部署在一个单独的服务器上,而工作流Job是在Hadoop集群的节点上执行的。即使Oozie在Hadoop集群的某个节点上,它也是处于与Hadoop进行独立无关的JVM示例之中(Oozie部署在Servlet容器当中)。

  • 异步性
    操作节点的执行,对于Oozie来说是异步的。Oozie启动一个工作流Job,这个工作流Job便开始执行。Oozie可以通过两种方式来探测工作流Job的执行情况:一种是基于回调机制(call backs),对每个任务的执行(可以看成是操作节点的执行)都设置一个唯一的URL,如果任务执行结束或者执行失败,会通过回调这个URL通知Oozie已经完成;另一种就是轮询(polling)Oozie不停地去查询任务执行的完成状态。如果由于网络故障回调机制失败,也会使用轮询的方式去获取任务的状态。
    这里写图片描述
    ps:上图红色箭头指向的就是一个操作节点的url

  • 执行结果要么成功,要么失败
    如果操作节点执行成功,则会转向ok节点;如果失败则会转向error节点。
    这里写图片描述

  • 可恢复性
    如果一个动作节点执行失败,Oozie提供了一些恢复执行的策略,这个要根据失败的特点来进行:如果是状态转移过程中失败,Oozie会根据指定的重试时间间隔去重新执行;如果不是转移性质的失败,则只能通过手工干预来进行恢复;如果重试恢复执行都没有解决问题,则最终会跳转到error节点。
    当成功执行时会执行ok定义的to name,失败时执行error定义的to name,当为error时,必须提供 error-code and error-message 信息给 Oozie。可以通过使用decision node对error-code进行switch-case从而实现不同错误代码不同的处理逻辑。

下面以几个典型Oozie内置支持的动作节点类型为例,详细介绍:

  • Map-Reduce动作

map-reduce动作会在工作流Job中启动一个MapReduce Job任务运行,我们可以详细配置这个MapReduce Job。另外,可以通过map-reduce元素的子元素来配置一些其他的任务,如streaming、pipes、file、archive等等。
下面给出包含这些内容的语法格式说明:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <map-reduce>
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
                <delete path="[PATH]"/>
                ...
                <mkdir path="[PATH]"/>
                ...
            </prepare>
            <streaming>
                <mapper>[MAPPER-PROCESS]</mapper>
                <reducer>[REDUCER-PROCESS]</reducer>
                <record-reader>[RECORD-READER-CLASS]</record-reader>
                <record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
                ...
                <env>[NAME=VALUE]</env>
                ...
            </streaming>
<!-- Either streaming or pipes can be specified for an action, not both -->
            <pipes>
                <map>[MAPPER]</map>
                <reduce>[REDUCER]</reducer>
                <inputformat>[INPUTFORMAT]</inputformat>
                <partitioner>[PARTITIONER]</partitioner>
                <writer>[OUTPUTFORMAT]</writer>
                <program>[EXECUTABLE]</program>
            </pipes>
            <job-xml>[JOB-XML-FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <file>[FILE-PATH]</file>
            ...
            <archive>[FILE-PATH]</archive>
            ...
        </map-reduce>        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>
  • Hive动作

Hive主要是基于类似SQL的HQL语言的,它能够方便地操作HDFS中数据,实现对海量数据的分析工作。Hive动作的语法格式如下所示:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.2">
    ...
    <action name="[NODE-NAME]">
        <hive xmlns="uri:oozie:hive-action:0.2">
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
                <delete path="[PATH]" />
                ...
                <mkdir path="[PATH]" />
                ...
            </prepare>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <!-- HIVE-SCRIPT就是xxx.hql的脚本啦
            param 就是hive中的hivevar hiveconf传参 -->
            <script>[HIVE-SCRIPT]</script>
            <param>[PARAM-VALUE]</param>
            ...
        </hive>
        <ok to="[NODE-NAME]" />
        <error to="[NODE-NAME]" />
    </action>
    ...
</workflow-app>
  • Fs动作

Fs动作主要是基于HDFS的一些基本操作,如删除路径、创建路径、移动文件、设置文件权限等等。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <fs>
            <delete path='[PATH]' />
            ...
            <mkdir path='[PATH]' />
            ...
            <move source='[SOURCE-PATH]' target='[TARGET-PATH]' />
            ...
            <chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
            ...
            <touchz path='[PATH]' />
        </fs>
        <ok to="[NODE-NAME]" />
        <error to="[NODE-NAME]" />
    </action>
    ...
</workflow-app>
支持的hadoop fs命令有
move , delete , mkdir , chmod , touchz and chgrp。
FS命令是 串行执行的,即上一个命令执行完之后才会往下执行下一个命令。
FS命令 不会自动回滚的,即 当前命令失败之后,前面对HDFS的成功操作命令并不会撤销。因此一般使用时都会定义一个单独action来检查下面的FS ACTION将做的操作的source或者target文件是否有效等等。然后再进行FS ACTION定义的操作,避免中途失败造成的风险。

delete指定path如果是一个目录的话,会递归的删除子目录然后再删除该目录,类似hadoop fs -rm -r

mkdir会创建所有missing的目录,类似于hadoop fs -mkdir -p

move 的source-path必须存在,The file system URI(e.g. hdfs://{nameNode}) 在target path中可以被跳过,默认和source 的system URI一样,但是也可以指定target自己的 system URI. target path的父目录必须存在,如果target path已经存在的话,source path将作为target path 这个目录的子目录移动过去。

touchz 当path不存在时创建一个空文件(zero length file),如果存在的话,执行的是touch操作,即修改文件的时间戳。

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.5">
    ...
    <action name="hdfscommands">
         <fs>
            <delete path='hdfs://foo:8020/usr/tucu/temp-data'/>
            <mkdir path='archives/${wf:id()}'/>
            <move source='${jobInput}' target='archives/${wf:id()}/processed-input'/>
            <!-- <recursive/> 表示递归操作即chmod -r ,chgrp -r -->
            <chmod path='${jobOutput}' permissions='-rwxrw-rw-' dir-files='true'><recursive/></chmod>
            <chgrp path='${jobOutput}' group='testgroup' dir-files='true'><recursive/></chgrp>
        </fs>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>
  • Spark动作

工作流作业直到当前的Spark作业完成后才会继续调度执行下面的内容;用户需要自己定义job-tracker和name-node信息。语法格式:

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
    ...
    <action name="[NODE-NAME]">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[SPARK SETTINGS FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <master>[SPARK MASTER URL]</master>
            <mode>[SPARK MODE]</mode>
            <name>[SPARK JOB NAME]</name>
            <class>[SPARK MAIN CLASS]</class>
            <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
            <spark-opts>[SPARK-OPTIONS]</spark-opts>
            <arg>[ARG-VALUE]</arg>
                ...
            <arg>[ARG-VALUE]</arg>
            ...
        </spark>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

以一个作业为例,一步步来分析:
这里写图片描述

prepare:(可选)如果出现,表明在启动作业之前,需要创建/删除一系列路径。特定的路径必须带有: hdfs://HOST:PORT .

job-xml:(可选)如果出现,即为spark作业指定配置文件

:(可选)如果出现,将其中设置的配置属性传递给spark作业。

master:(必须)即为spark作业指定调度器:yarn-master yarn-cluster mesos local等
Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local.

mode:(可选)即为spark作业指定driver运行位置。client或者cluster。这个其实没啥卵用,因为在master中,你可以指定: master=yarn-client is equivalent to master=yarn, mode=client从而无需设置mode
此外,一个本地的master总是以client模式运行。

基于不同的master和mode,Spark作业运行情况如下:

● local mode: everything runs here in the Launcher Job.
● yarn-client mode: the driver runs here in the Launcher Job and the executor in Yarn.
● yarn-cluster mode: the driver and executor run in Yarn.

name:(必须)spark作业的名称
class:(可选)入口类(main class)。对python作业不需要指定
jar:(必须)一系列jar包或者python文件。
spark-opts:(可选)提交spark作业的时候,设置executors等参数
arg:(可选)如果提供,则将其传递给spark application

PySpark with Spark Action
为了提交pyspark脚本,pyspark依赖必须设置好,详情请看这个官方文档

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ....
    <action name="myfirstpysparkjob">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <master>yarn-cluster</master>
            <name>Spark Example</name>
            <jar>pi.py</jar>
            <spark-opts>--executor-memory 20G --num-executors 50
            --conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts>
            <arg>100</arg>
        </spark>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

注意:上面的例子中,pi.py必须是spark的本地文件!这个python文件应该放在靠近workflow.xml的lib/目录下或者将文件前面加上详细的地址。

表达式语言函数(Expression Language Functions)

Oozie除了可以使用Properties文件定义一些属性之外,还提供了一些内置的EL函数,能够方便地实现流程的定义和控制,下面我们分组列表说明:

基本的EL常量

这里写图片描述
基本的EL函数

这里写图片描述

工作流EL函数
这里写图片描述

下面是一个在实际工程中使用的例子:
这里写图片描述

HDFS EL函数
这里写图片描述

下面以使用decision node和HDFS EL函数的工作流作为例子:

<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="mydecision">
        <switch>
            <case to="reconsolidatejob">
              <!-- ${fs:fileSize('/usr/foo/myinputdir') gt 10 * GB},返回true或者false -->
              ${fs:fileSize(secondjobOutputDir) gt 10 * GB}
            </case>
            <case to="rexpandjob">
              ${fs:filSize(secondjobOutputDir) lt 100 * MB}
            </case>
            <case to="recomputejob">
              ${ hadoop:counters('secondjob')[RECORDS][REDUCE_OUT] lt 1000000 }
            </case>
            <default to="end"/>
        </switch>
    </decision>
    ...
</workflow-app>

参考资料

1、官方文档
2、Oozie工作流程定义详解

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐