工作流调度框架Oozie

  • 工作流
    import -> hive -> export
    将不同的业务进行编排
  • 调度
    作业/任务 定时执行
    事件的触发执行
    • 时间:如每天5点触发
    • 数据集:如某个文件夹中有文件就触发

1.Hadoop调度框架

1. Linux Crontab(最原始,最简单的任务调度)

crontab是针对每个用户而言

规则:*分 *时 *天 *月 *星期 cmd

针对Hadoop任务作业调度:

  • mr
    /opt/cdh/hadoop/bin/hadoop jar temp.jar input output
  • hive
    /opt/cdh/hive/bin/hive -f temp.sql
  • sqoop
    /opt/cdh/sqoop/bin/sqoop –options-file sqoop-import.txt
  • shell script
    /bin/sh XXX.sh

2.Azkaban

https://azkaban.github.io

3.Oozie

适合ETL
http://oozie.apache.org

4.Zenus

https://github.com/michae18335/zeus2

2.Oozie功能框架

  • 一个基于工作流引擎的开源框架,是由Cloudera公司贡献给Apache的,它能提供对Hadoop MapReduce和Pig Jobs的任务调度与协调。Oozie需要部署到Java Servlet容器中运行。
  • Oozie工作流定义,同JBoss jBPM提供的jPDL一样,也提供类似的流程定义语言hPDL,通过对XML文件格式来实现流程的定义。对于工作流系统一般会用很多不同功能的节点,比如分支、并发、汇合等等。
  • Oozie定义了控制流节点(Control Flow Nodes)和动作节点(Action Nodes),其中控制流节点定义了流程的开始和结束,以及控制流程的执行路径(Execution Path),如decision,fork,join等;动作节点包括Hadoop map-reduce、Hadoop文件系统、Pig、SSH、HTTP、eMain和Oozie子流程。

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

3.Oozie安装部署

4.Oozie工作流调度

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

1.Map-Reduce Action

官方文档:http://archive.cloudera.com/cdh5/cdh/5/oozie-4.1.0-cdh5.11.1/DG_Examples.html

例子:

以cloudera用户为例:

// 创建cloudera用户主目录
# hdfs dfs -mkdir -p /user/cloudera

//将Oozie自带的example放到hdfs上
# cd /opt/cloudera/parcels/CDH/share/doc/oozie-4.1.0+cdh5.11.1+431
# tar -zxvf oozie-examples.tar.gz -C ~
# hdfs dfs -mkdir examples
# hdfs dfs -put examples/* examples

模仿Oozie 自带的example运行MapReduce Action:

//将Oozie自带的examples中的map-reduce例子复制到oozie-apps/mr-wordcount-wf/中
# mkdir oozie-apps
# cp -r examples/apps/map-reduce/ oozie-apps/
# cd oozie-apps
# mv map-reduce mr-wordcount-wf

//删除无用的文件
# cd mr-wordcount-wf
# rm -rf job-with-config-class.properties
# rm -rf workflow-with-config-class.xml
# rm -rf lib/oozie-examples-4.1.0-cdh5.11.1.jar

//最后只保留下面三个文件
# ls
job.properties  lib  workflow.xml
如何定义一个WorkFlow

(1)job.properties
- 关键点: 指向workflow.xml文件所在的HDFS位置

(2)workflow.xml

  • 定义文件
  • XML文件
  • 包含几点:
    • start
    • action: 例如MapReduce、Hive、Sqoop、Shell
    • ok
    • error
    • kill
    • end

(3)lib目录: 依赖的jar包

MapReduce Action:
如何使用Oozie调度MapReduce程序
关键点:将以前Java MapReduce程序中的【Driver】部分定义在 Configuration XML文件中

job.properties编写

关键点: 指向workflow.xml文件所在的HDFS位置

job.properties

# See the License for the specific language governing permissions and
# limitations under the License.
#

# 定义变量,供job.properties和workflow.xml使用
nameNode=hdfs://master:8020
jobTracker=master:8032
queueName=default
oozieAppsRoot=user/cloudera/oozie-apps
oozieDataRoot=user/cloudera/oozie/datas

# 执行workflow.xml在HDFS中的位置
oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/mr-wordcount-wf/workflow.xml

inputDir=mr-wordcount-wf/input
outputDir=mr-wordcount-wf/output

workflow.xml编写:
- 流程控制节点
- Action节点

workflow.xml编写配置的是《使用 IDEA Maven 开发Hadoop Cloudera CDH5.11.1》中的wordcount程序,不使用Oozie的执行命令为:

# hadoop jar wordcount2.jar wordcount2.WordCount2 /user/hdfs/mapreduce/wordcount/input /user/hdfs/mapreduce/wordcount/output

所用 Jar 包为:wordcount2.jar
主类为: wordcount2.WordCount2
Mapper类为:wordcount2.TokenizerMapper
Reducer类为:wordcount2.IntSumReducer
注意:

MapReduce程序中的Mapper类为:wordcount2.TokenizerMapper, Reducer类为:wordcount2.IntSumReducer,但是 wordcount.jar包中生成的类如下:

这里写图片描述

从 wordcount2.jar包中可以看出:
Mapper类为wordcount2.WordCount2 TokenizerMapper,Reducerwordcount2.WordCount2 T o k e n i z e r M a p p e r , R e d u c e r 类 为 : w o r d c o u n t 2. W o r d C o u n t 2 IntSumReducer

workflow.xml:

<!--
  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
  distributed with this work for additional information
  regarding copyright ownership.  The ASF licenses this file
  to you under the Apache License, Version 2.0 (the
  "License"); you may not use this file except in compliance
  with the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
<workflow-app xmlns="uri:oozie:workflow:0.5" name="mr-wordcount-wf">
    <start to="mr-node-wordcount"/>
    <action name="mr-node-wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>wordcount2.WordCount2$TokenizerMapper</value>
                </property>
                <property>
                    <name>mapreduce.map.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.map.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>wordcount2.WordCount2$IntSumReducer</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${inputDir}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

注意:官网给的例子中的Map-Reduce,使用的是旧的api,当我们在集群上运行map-reduce程序时,使用的是新的api,故我们将api改成新的api:

<property>
    <name>mapred.mapper.new-api</name>
    <value>true</value>
</property>
<property>
    <name>mapred.reducer.new-api</name>
    <value>true</value>
</property>
导入jar包

将wordcount2.jar 放到 /home/cloudera/oozie-apps/mr-wordcount-wf/lib下即可

运行
# oozie job -oozie http://master:11000/oozie -config oozie-apps/mr-wordcount-wf/job.properties -run

或者

# export OOZIE_URL="http://master:11000/oozie"
# oozie job -oozie oozie-apps/mr-wordcount-wf/job.properties -run

Hive Action

官方文档:http://archive.cloudera.com/cdh5/cdh/5/oozie-4.1.0-cdh5.11.1/DG_HiveActionExtension.html

其实每种 Action的创建方式都类似:
我们模仿examples/apps下的hive,编写自己的Hive Action.

首先,在hive中创建数据库,以方便我们进行hive 查询。数据库的创建我们按照《Hive应用实例:WordCount》创建 word_count 数据库。

Hive Action 目录结构为:

# ls hive-select/
hive-site.xml  job.properties  lib  wordcount.sql  workflow.xml
  • hive-site.xml:hive的配置文件
  • job.properties:主要执行Action的workflow.xml所在HDFS目录
  • lib: 需要将mysql数据库驱动放入,以便连接数据库
  • wordcount.sql: hive执行的SQL脚本
  • workflow.xml:Hive Action的配置文件

job.properties:

nameNode=hdfs://master:8020
jobTracker=master:8032
queueName=default
oozieAppsRoot=user/cloudera/oozie-apps
oozieDataRoot=user/cloudera/oozie/datas
oozie.use.system.libpath=true

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/hive-select/

outputDir=hive-select/output

wordcount.sql:

insert overwrite directory '${OUTPUT}'
select * from default.word_count;

workflow.xml:

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.5" name="wf-hive-select">
    <start to="hive-node"/>

    <action name="hive-node">
        <hive xmlns="uri:oozie:hive-action:0.5">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <job-xml>${nameNode}/${oozieAppsRoot}/hive-select/hive-site.xml</job-xml>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <script>wordcount.sql</script>
            <param>OUTPUT=${nameNode}/${oozieDataRoot}/${outputDir}</param>
        </hive>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

运行:

# oozie job -config oozie-apps/hive-select/job.properties -run

Sqoop Action

官方文档:http://archive.cloudera.com/cdh5/cdh/5/oozie-4.1.0-cdh5.11.1/DG_SqoopActionExtension.html

MariaDB [test]> select * from test.my_user;
+----------+------+
| name     | age  |
+----------+------+
| zhangsan |   14 |
| lisi     |   34 |
| wangwu   |   55 |
+----------+------+
3 rows in set (0.00 sec)
# sqoop import --connect jdbc:mysql://master:3306/test --username root --password 123456 --table my_user --target-dir /user/cloudera/oozie/datas/sqoop-import-user/output --fields-terminated-by "\t" --num-mappers 1

注意:Sqoop命令中 –fields-terminated-by “\t” 只能使用双引号” “,不能使用单引号’ ’
并且,经验证Sqoop使用的是新的api,因此workflow.xml中需要加入如下配置

<property>
    <name>mapred.mapper.new-api</name>
    <value>true</value>
</property>
<property>
    <name>mapred.reducer.new-api</name>
    <value>true</value>
</property>

Sqoop Action 目录结构:

# ls sqoop-import-user/
job.properties  lib  workflow.xml

lib:目录下存放mysql数据库驱动

job.properties:

nameNode=hdfs://master:8020
jobTracker=master:8032
queueName=default
oozieAppsRoot=user/cloudera/oozie-apps
oozieDataRoot=user/cloudera/oozie/datas
oozie.use.system.libpath=true

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/sqoop-import-user/

outputDir=sqoop-import-user/output

workflow.xml:

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.5" name="sqoop-wf">
    <start to="sqoop-node"/>

    <action name="sqoop-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.3">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>import --connect jdbc:mysql://master:3306/test --username root --password 123456 --table my_user --target-dir /user/cloudera/oozie/datas/sqoop-import-user/output --fields-terminated-by "\t" --num-mappers 1</command>
        </sqoop>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

Shell Action

官方文档:http://archive.cloudera.com/cdh5/cdh/5/oozie-4.1.0-cdh5.11.1/DG_ShellActionExtension.html

# ls shell-hive-select/
job.properties  user-select.sh  user-select.sql  workflow.xml

job.properties:

nameNode=hdfs://master:8020
jobTracker=master:8032
queueName=default
oozieAppsRoot=user/cloudera/oozie-apps
oozieDataRoot=user/cloudera/oozie/datas

oozie.wf.application.path=${nameNode}/${oozieAppsRoot}/shell-hive-select/

exec=user-select.sh
script=user-select.sql

workflow.xml:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="shell-wf">
    <start to="shell-node"/>
    <action name="shell-node">
        <shell xmlns="uri:oozie:shell-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <exec>${exec}</exec>
            <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${exec}#${exec}</file>
            <file>${nameNode}/${oozieAppsRoot}/shell-hive-select/${script}#${script}</file>
            <capture-output/>
        </shell>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

Shell Action 没运行成功,有空再整

5.Oozie协作调度

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

Oozie使用的是UTC

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

Cloudera Manager中修改Oozie时区:

Cloudera Oozie –> 配置 –> Oozie Server(范围) –> 高级(类别) –> oozie-site.xml 的 Oozie Server 高级配置代码段(安全阀)

这里写图片描述

即:

<property>
    <name>oozie.processing.timezone</name>
    <value>GMT+0800</value>
</property>

当运行Corrdinator时出现,如下错误:

E1003: Invalid coordinator application attributes, Coordinator job with frequency [1] minutes is faster than allowed maximum of 5 minutes (oozie.service.coord.check.maximum.frequency is set to true

配置如下:

这里写图片描述

# vi /opt/cloudera/parcels/CDH/lib/oozie/webapps/oozie/oozie-console.js

这里写图片描述

即:

function getTimeZone() {
    Ext.state.Manager.setProvider(new Ext.state.CookieProvider());
    return Ext.state.Manager.get("TimezoneId","GMT+0800");
}

这里写图片描述

这里写图片描述

这里写图片描述

例子:

# ls cron-schedule/
coordinator.xml  job.properties  workflow.xml

Coordinator中包含Workflow(job.properties、workflow.xml)并多了一个触发文件(coordinator.xml)。

job.properties:

nameNode=hdfs://master:8020
jobTracker=master:8032
queueName=default
oozieAppsRoot=user/cloudera/oozie-apps
oozieDataRoot=user/cloudera/oozie/datas


oozie.coord.application.path=${nameNode}/${oozieAppsRoot}/cron-schedule
start=2017-08-14T19:15+0800
end=2017-08-14T19:19+0800
workflowAppUri=${nameNode}/${oozieAppsRoot}/cron-schedule

workflow.xml:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="no-op-wf">
    <start to="end"/>
    <end name="end"/>
</workflow-app>

coordinator.xml:

<coordinator-app 
    name="cron-coord" 
    frequency="${coord:minutes(1)}" 
    start="${start}" 
    end="${end}" 
    timezone="GMT+0800" xmlns="uri:oozie:coordinator:0.4">
        <action>
        <workflow>
            <app-path>${workflowAppUri}</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

上面的例子,只是定时任务执行了一个空的workflow,下面的例子演示了一个定时任务执行MapReduce Action的任务:

# ls cron
coordinator.xml  job.properties  lib  workflow.xml

其中,job.properties、lib和workflow.xml是上面MapReduce Action中的mr-wordcount-wf程序。然后只需要编写coordinator.xml中的定时任务即可。
job.properties:

nameNode=hdfs://master:8020
jobTracker=master:8032
queueName=default
oozieAppsRoot=user/cloudera/oozie-apps
oozieDataRoot=user/cloudera/oozie/datas

oozie.coord.application.path=${nameNode}/${oozieAppsRoot}/cron
start=2017-08-14T21:08+0800
end=2017-08-14T21:12+0800
workflowAppUri=${nameNode}/${oozieAppsRoot}/cron

inputDir=mr-wordcount-wf/input
outputDir=mr-wordcount-wf/output

workflow.xml:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="mr-wordcount-wf">
    <start to="mr-node-wordcount"/>
    <action name="mr-node-wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/${oozieDataRoot}/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>wordcount2.WordCount2$TokenizerMapper</value>
                </property>
                <property>
                    <name>mapreduce.map.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.map.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>wordcount2.WordCount2$IntSumReducer</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${inputDir}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${nameNode}/${oozieDataRoot}/${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

coordinator.xml:

<coordinator-app 
    name="cron-coord" 
    frequency="0/2 * * * *" 
    start="${start}" 
    end="${end}" 
    timezone="GMT+0800" xmlns="uri:oozie:coordinator:0.4">
        <action>
        <workflow>
            <app-path>${workflowAppUri}</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

作业

作业:
workflow: 多个action
案例:
- start node
- hive action:table result –> hdfs
- sqoop action: hdfs –> mysql
- end
- kill

在Hive Table中,提供了一些列的属性,方便进行操作。
INPUT_FILE_NAME:数据所在文件名称

Logo

更多推荐