Oozie的简单使用
1、Oozie的介绍 Oozie是一个工作流引擎服务器,用于运行Hadoop Map/Reduce和Hive等任务工作流.同时Oozie还是一个Java Web程序,运行在Java Servlet容器中,如Tomcat中。Oozie以action为基本任务单位,可以将多个action构成一个DAG图(有向无环图Direct Acyclic Graph.
1、Oozie的介绍
Oozie是一个工作流引擎服务器,用于运行Hadoop Map/Reduce和Hive等任务工作流.同时Oozie还是一个Java Web程序,运行在Java Servlet容器中,如Tomcat中。Oozie以action为基本任务单位,可以将多个action构成一个DAG图(有向无环图Direct Acyclic Graph)的模式进行运行。Oozie工作流通过HPDL(一种通过XML自定义处理的语言)来构造Oozie的工作流。一个oozie服务器主要包括四个服务:Oozie Workflow、Oozie Coordinator、Oozie Bundle和Oozie SLA(oozie服务器等级协定)。
1.1 Oozie四大组件服务介绍
- Oozie Workflow: 该组件用于定义和执行一个特定顺序的mapreduce、hive和pig作业。
- Oozie Coordinator:该组件用于支持基于事件、系统资源存在性等条件的workflow的自动化执行。
- Oozie Bundle:该引擎可以定义和执行"一束"应用,从而提供一个批量化的方法,将一组Coordinator应用程序一起进行管理。
- Oozie服务器等级协定(Service Level Agreement, SLA):该组件支持workflow应用程序执行过程的记录跟踪。
workflow是oozie中最基本的一个服务组件。三大服务的的关系是:bundle包含多个coordinator,coordinator包含一个workflow,workflow定义具体的action动作。
1.2 Workflow介绍
workflow使用hadoop流程定义语言(Hadoop Process Defination Language, hPDL)来描述工作流,hPDL是类似xml语言的一种相当简洁的定义类语言,使用有限数目的流控制节点和动作节点来描述workflow,说也就是workflow有两大类节点构成:工作流控制节点和动作节点。其中工作流控制节点是workflow本身提供的一种控制workflow工作流执行路径的方法,不可以自定义;动作节点是具体的操作方法,用户可以自定义。
1.3 Workflow工作流生命周期
1.4 Workflow流控制节点
1.5 Workflow默认支持的action动作节点
1.6 Workflow扩展的action动作节点
1.7 Workflow异步操作
Workflow中的所有异步操作(action)都需要在hadoop集群上以mapreduce作业的方式进行执行,这样充分利用了集群的优点。oozie通过两种方式来检查任务是否完成: - 回调:当一个任务和一个计算被启动后,会为任务提供一个回调url,该任务执行完成后,会执行回调来通知oozie。
- 轮询:在任务执行回调失败的情况下,无论任何原因,都支持以轮询的方式进行查询。 oozie提供这两种方式来控制任务,将资源密集型的任务放到服务节点之外,使oozie节约服务器资源,确保单个oozie服务器可以支持上千个作业。
注意: 需要hadoop集群开启hdfs、yarn和jobhistory服务。
1.8 Workflow规则
workflow任务主要由job.properties、workflow.xml和其他动作需要的资源文件三部分组成,其中job.properties中定义workflow作业的配置信息,workflow.xml定义作业的执行工作流。workflow.xml文件是一个有定义规则的xml文件。
workflow.xml
配置规则详见:workflow中的文档workflow.template.xml
job.properties
配置参考:workflow中的文档job.template.properties
1.9 Coordinator介绍
coordinator支持workflow过程的自动启动,常用于一些由时间/数据可用性来触发的、会多次调用的workflow过程的设计和执行。通过定义多个顺序运行的、前一个输出作为后一个输入的workflow,coordinator也支持定义常规运行的(包括以不同时间间隔运行的)workflow作业之间的依赖。
1.10 Bundle介绍
Oozie Bundle是顶层抽象,允许将一组coordinator任务打包成为一个bundle任务。组成一个整体bundle的多个coordinator可以作为一个整体来进行控制,包括启动、停止、挂起、继续、重新执行等操作。Bundle中不支持它的coordinator应用程序之间的显示依赖关系,如果需要定义这些依赖关系,可以在 coordinator中通过输入输出事件来指定依赖。
2. Oozie的安装
Oozie使用tomcat等服务器作为web界面展示容器,使用关系型数据库存储oozie的工作流元数据,默认使用debry,由于debry的缺点,一般情况使用mysql作为oozie的元数据库,使用extjs来作为报表展示js框架。
安装步骤:
- 安装mysql
- 安装tomcat
- 安装oozie
2.1 Tomcat安装
- 下载tomcat压缩包:
wget http://archive.apache.org/dist/tomcat/tomcat-7/v7.0.8/bin/apache-tomcat-7.0.8.tar.gz
- 解压压缩包
- 设置环境变量
- 启动tomcat测试是否安装成功。
2.2 Oozie安装
- 下载oozie和ext-2.2.zip
- 设置环境变量
export OOZIE_HOME=/home/hadoop/bigdater/oozie-4.0.0-cdh5.3.6
export PATH=$PATH:$OOZIE_HOME/bin
- 进行conf配置信息修改,修改conf/oozie-site.xml文件,主要就是进行元数据指定和service指定。
oozie-site.xml
<?xml version="1.0"?>
<configuration>
<property>
<!--指定service的信息-->
<name>oozie.services</name>
<value>
<!--将JobsConcurrencyService移到第一个-->
org.apache.oozie.service.JobsConcurrencyService,
org.apache.oozie.service.SchedulerService,
org.apache.oozie.service.InstrumentationService,
org.apache.oozie.service.MemoryLocksService,
org.apache.oozie.service.CallableQueueService,
org.apache.oozie.service.UUIDService,
org.apache.oozie.service.ELService,
org.apache.oozie.service.AuthorizationService,
org.apache.oozie.service.UserGroupInformationService,
org.apache.oozie.service.HadoopAccessorService,
org.apache.oozie.service.URIHandlerService,
org.apache.oozie.service.DagXLogInfoService,
org.apache.oozie.service.SchemaService,
org.apache.oozie.service.LiteWorkflowAppService,
org.apache.oozie.service.JPAService,
org.apache.oozie.service.StoreService,
org.apache.oozie.service.CoordinatorStoreService,
org.apache.oozie.service.SLAStoreService,
org.apache.oozie.service.DBLiteWorkflowStoreService,
org.apache.oozie.service.CallbackService,
org.apache.oozie.service.ActionService,
org.apache.oozie.service.ShareLibService,
org.apache.oozie.service.ActionCheckerService,
org.apache.oozie.service.RecoveryService,
org.apache.oozie.service.PurgeService,
org.apache.oozie.service.CoordinatorEngineService,
org.apache.oozie.service.BundleEngineService,
org.apache.oozie.service.DagEngineService,
org.apache.oozie.service.CoordMaterializeTriggerService,
org.apache.oozie.service.StatusTransitService,
org.apache.oozie.service.PauseTransitService,
org.apache.oozie.service.GroupsService,
org.apache.oozie.service.ProxyUserService,
org.apache.oozie.service.XLogStreamingService,
org.apache.oozie.service.JvmPauseMonitorService
</value>
</property>
<!--指定hadoop的配置文件路径-->
<property>
<name>oozie.service.HadoopAccessorService.hadoop.configurations</name>
<value>*=/home/hadoop/bigdater/hadoop-2.5.0-cdh5.3.6/etc/hadoop</value>
</property>
<!--指定其关系型数据库,元数据具体存在哪儿,这里指定为mysql数据库,默认为deby数据库-->
<property>
<name>oozie.service.JPAService.create.db.schema</name>
<value>true</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.driver</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.url</name>
<value>jdbc:mysql://hh:3306/oozie?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.username</name>
<value>hive</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.password</name>
<value>hive</value>
</property>
<property>
<name>oozie.service.JPAService.jdbc.password</name>
<value>hive</value>
</property>
<!--时间区的设置-->
<property>
<name>oozie.processing.timezone</name>
<value>GMT+0800</value>
</property>
<!-- start run the shell action config -->
<property>
<name>oozie.service.SchemaService.wf.ext.schemas</name>
<value>oozie-sla-0.1.xsd,shell-action-0.1.xsd,hive-action-0.2.xsd</value>
</property>
<property>
<name>oozie.service.ActionService.executor.ext.classes</name>
<value>
org.apache.oozie.action.hadoop.ShellActionExecutor,
org.apache.oozie.action.hadoop.HiveActionExecutor
</value>
</property>
<!-- 控制oozie的coordinator任务运行间隔时间是否检测-->
<property>
<name>oozie.service.coord.check.maximum.frequency</name>
<value>false</value>
</property>
</configuration>
- 可以在conf/oozie-env.sh中进行参数修改,比如修改端口号,默认端口号为11000
- oozie根目录创建libext文件夹,复制mysql的driver压缩包到libext文件夹中
cp ~/bigdater/hive-0.13.1-cdh5.3.6/lib/mysql-connector-java-5.1.31.jar ./libext/
or
cp ~/bigdater/softs/mysql-connector-java-5.1.31.jar ./libext/
- 执行sql创建,执行完成后,mysql中出现数据库和数据表
在oozie的根目录下执行命令:ooziedb.sh create -sqlfile oozie.sql -run
- 在hadoop的core-site.xml里设置hadoop代理用户
hadoop.proxyuser.hadoop.hosts&hadoop.proxyuser.hadoop.groups
- 在hdfs上创建公用文件夹share
oozie根目录下执行命令:oozie-setup.sh sharelib create -fs hdfs://hh:8020 -locallib oozie-sharelib-4.0.0-cdh5.3.6-yarn.tar.gz
- 创建war文件
执行addtowar.sh -inputwar ./oozie.war -outputwar ./oozie-server/webapps/oozie.war -hadoop 2.5.0 $HADOOP_HOME -jars ./libext/mysql-connector-java-5.1.31.jar -extjs ../softs/ext-2.2.zip
或者
将hadoop相关包,mysql相关包和ext压缩包放到libext文件夹中,运行oozie-setup.sh prepare-war也可以创建war文件
- 运行
oozie的bin目录下执行:oozied.sh run 或者 oozied.sh start(前者在前端运行,后者在后台运行)
- 查看web界面&查看状态
oozie admin -oozie http://hh:11000/oozie -status
3. 案例介绍
- 定义fs动作,在hdfs文件系统上进行文件操作
job.properties
nameNode=hdfs://hh:8020
jobTracker=hh:8032
examplesRoot=fs
oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}
workflow.xml
<workflow-app name='fs-wf1' xmlns="uri:oozie:workflow:0.3">
<start to="fs-delete"/>
<action name="fs-delete">
<fs>
<delete path="${nameNode}/beifeng/beifeng14-0.0.1.jar"/>
<delete path="${nameNode}/beifeng/index"/>
</fs>
<ok to="fs-mkdir"/>
<error to="fail"/>
</action>
<action name="fs-mkdir">
<fs>
<mkdir path="${nameNode}/beifeng/18"/>
</fs>
<ok to="fs-move"/>
<error to="fail"/>
</action>
<action name="fs-move">
<fs>
<move source="${nameNode}/BF-09.1447769674855.log.tmp" target="${nameNode}/beifeng" />
</fs>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>script failed, error message:${wf:errorMessage(wf:lastErrorNode())}</message>
</kill>
<end name="end"/>
</workflow-app>
注意:需要先在本地机器上的~/jobs/oozie文件夹中创建比如fs文件夹,并将job.properties、workflow.xml放入该fs文件夹中,下面的类似。
- 定义fs动作,判断文件夹存在,就删除,如果不存在,不进行任何操作
job.properties
nameNode=hdfs://hh:8020
jobTracker=hh:8032
examplesRoot=fs2
oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}
workflow.xml
<workflow-app name='fs-wf2' xmlns="uri:oozie:workflow:0.3">
<start to="decision"/>
<decision name="decision">
<switch>
<case to="fs-delete">${fs:exists("/beifeng/18")}</case>
<default to="end"/>
</switch>
</decision>
<action name="fs-delete">
<fs>
<delete path="${nameNode}/beifeng/18"/>
</fs>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>script failed, error message:${wf:errorMessage(wf:lastErrorNode())}</message>
</kill>
<end name="end"/>
</workflow-app>
- 定义shell动作,在服务器的tmp目录下创建一个文件夹
job.properties
nameNode=hdfs://hh:8020
jobTracker=hh:8032
examplesRoot=shell
EXEC=script.sh
oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}
workflow.xml
<workflow-app name='shell-wf1' xmlns="uri:oozie:workflow:0.3">
<start to="shell"/>
<action name="shell">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>${EXEC}</exec>
<file>${EXEC}#${EXEC}</file>
</shell>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>script failed, error message:${wf:errorMessage(wf:lastErrorNode())}</message>
</kill>
<end name="end"/>
</workflow-app>
mkdir /tmp/beifeng-net
- 定义hive动作,执行sql脚本,将数据导入到hive中的表中
job.properties
nameNode=hdfs://hh:8020
jobTracker=hh:8032
examplesRoot=hive
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}
workflow.xml
<workflow-app name='hive-wf1' xmlns="uri:oozie:workflow:0.3">
<start to="hive"/>
<action name="hive">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.hive.defaults</name>
<value>hive-site.xml</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hh:9083</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/hive</value>
</property>
</configuration>
<script>d.sql</script>
</hive>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>script failed, error message:${wf:errorMessage(wf:lastErrorNode())}</message>
</kill>
<end name="end"/>
</workflow-app>
hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hh:9083</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hh:3306/hive?createDatabaseIfNotExist=true&useUnicode=true&characterEncoding=UTF-8</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
</property>
<!--
<property>
<name>hive.aux.jars.path</name>
<value>file:///home/hadoop/jobs/beifeng14-0.0.1.jar</value>
</property>
-->
</configuration>
d.sql
load data inpath '/data.txt' into table t
data.txt
1
2
3
4
5
6
7
10
- 定义mapreduce动作,执行mapreduce任务
job.properties
nameNode=hdfs://hh:8020
jobTracker=hh:8032
queueName=default
examplesRoot=map-reduce
oozie.wf.application.path=${nameNode}/oozie/workflows/${examplesRoot}
workflow.xml
<workflow-app name='wordcount-wf2' xmlns="uri:oozie:workflow:0.3">
<start to="wordcount"/>
<action name="wordcount">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<!-- <prepare></prepare> -->
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.job.map.class</name>
<value>com.beifeng.mr.wordcount.WordCountMapper</value>
</property>
<property>
<name>mapreduce.job.reduce.class</name>
<value>com.beifeng.mr.wordcount.WordCountReducer</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>/beifeng/api</value>
</property>
<property>
<name>mapreduce.job.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapreduce.job.output.value.class</name>
<value>org.apache.hadoop.io.LongWritable</value>
</property>
<!--
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
-->
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>/beifeng/wordcount/18</value>
</property>
<property>
<name>mapreduce.job.acl-view-job</name>
<value>*</value>
</property>
<property>
<name>oozie.launcher.mapreduce.job.acl-view-job</name>
<value>*</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="end"/>
</action>
<kill name="kill">
<message>${wf:errorCode("wordcount")}</message>
</kill>
<end name="end"/>
</workflow-app>
直接将这些文件夹上传到linux机器上(比如上传到~/jobs/oozie文件夹中),然后将所有软件上传到hdfs的/oozie/workflows文件夹中。进入linux上传的目录(这里是jobs/oozie文件夹)执行命令:
oozie job -oozie http://hh:11000/oozie -config ./xxxx/job.properties -run,这里的xxxx就是各个文件夹名称
其他相关命令:
oozie job -oozie http://hh:11000/oozie -config ./xxxx/job.properties -submit (提交任务)
oozie job -oozie http://hh:11000/oozie -start job_id (运行已经提交的任务)
oozie job -oozie http://hh:11000/oozie -config ./xxxx/job.properties -run (submit和start命令合并)
oozie job -oozie http://hh:11000/oozie -info job_id (获取任务信息)
oozie job -oozie http://hh:11000/oozie -suspend job_id (暂停/挂起任务)
oozie job -oozie http://hh:11000/oozie -rerun job_id (重新运行暂停的任务)
- 定时任务,coordinate的使用案例cron
job.properties
nameNode=hdfs://hh:8020
jobTracker=hh:8030
queueName=default
examplesRoot=cron
oozie.service.coord.check.maximum.frequency=false
oozie.coord.application.path=${nameNode}/oozie/workflows/${examplesRoot}
start=2015-11-22T08:06+0800
end=2015-11-25T08:10+0800
workflowAppUri=${oozie.coord.application.path}
workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="no-op-wf">
<start to="end"/>
<end name="end"/>
</workflow-app>
coordinator.xml
<coordinator-app name="cron-coord" frequency="${coord:minutes(2)}" start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.2">
<action>
<workflow>
<app-path>${workflowAppUri}</app-path>
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>queueName</name>
<value>${queueName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
更多推荐
所有评论(0)