大数据平台技术框架支持的开发语言多种多样,开发人员的背景差异也很大,这就产生出很多不同类型的程序(任务)运行在大数据平台之上,如:mapreduce、hive、pig、spark、java、shell、python 等。
这些任务需要不同的运行环境,并且除了定时运行,各种类型之间的任务存在依赖关系,一张简单的任务依赖图如下:
- crontab (linux 自带命令,使用方式简单,适合不是非常复杂的场景,比如只按照时间来调度)
- oozie( hadoop 自带的开源调度系统,使用方式比较复杂,适合大型项目场景)
- azkaban(一个开源调度系统,使用方式比较简单,适合中小型项目场景)
- 企业定制开发(企业自研的调度系统,不开源)
azkaban是由linkedin开源的一个批量工作流任务调度器,azkaban来自linkedin公司,用于管理他们的hadoop批处理工作流。日常生产环境中,为了得到想要的数据,通常需要执行很多作业,一批作业执行完毕,再将中间结果进一步处理,最后得到有价值的数据,因此作业之间执行有先后顺序和依赖关系。这样的一组作业称为一个工作流,azkaban就是用来构建、运行和管理工作流的工具,它提供友好的web用户界面来维护和跟踪用户的工作流程。azkaban已经在linkedin运行了好几年,管理着很多hadoop和数据仓库作业流,具有很强的可用性。
官方文档:
https://azkaban.readthedocs.io/en/latest/
https://azkaban.github.io/azkaban/docs/latest/
简单来讲,它有几个特点:
- 分布式多执行器
- mysql重试
- 友好的用户界面
- 有条件的工作流
- 数据触发
- 高安全
- 支持插件扩展,从web ui到作业执行
- 全作者身份管理系统
azkaban 和 oozie 是市面上最流行的两种调度器。总体来说,ooize 相比 azkaban 是一个重量级的任务调度系统,功能全面,但部署和使用也更复杂,比较适合作为大型项目的任务调度系统。而 azkaban 相对而言,配置和使用更为简单,能够满足常见的任务调度,比较适合作为中小型项目的任务调度系统。
对比如下:
对比项目 | azkaban | oozie |
功能 | 两者均可以调度 mapreduce,pig,java,脚本工作流任务两者均可以定时执行工作流任务 | 跟azkaban一样 |
工作流定义 | azkaban 使用 properties 文件定义工作流 | oozie 使用 xml 文件定义工作流 |
工作流传参 | azkaban 支持直接传参 | oozie 支持参数和 el 表达式 |
定时执行 | azkaban 的定时执行任务是基于时间的 | oozie 的定时执行任务基于时间和输入数据 |
资源管理 | azkaban 有较严格的权限控制,如用户对工作流进行读/写/执行等操作 | oozie 暂无严格的权限控制 |
工作流执行 | azkaban 有两种运行模式,分别是单机模式和集群模式 | oozie 作为工作流服务器运行,支持多用户和多工作流 |
工作流管理 | azkaban 支持浏览器以及 ajax 方式操作工作流 | oozie 支持命令行、http rest、java api、浏览器操作工作流 |
azkaban 三大核心组件
- 关系型元数据库(mysql)
- azkaban web server
- azkaban executor server
azkabanwebserver 是 azkaban 的主要管理者,负责项目管理、身份验证、调度和监控执行,并且为用户界面
提交和执行工作流,记录工作流日志,和 azkaban webserver 可以在同一台服务器,也可部署在独立的机器。把 executor 单独分开有几个好处:
- 在多 executor 模式下可以方便扩展
- 工作流在某一个 executor 挂掉,可以在另一个 executor 上重试
- 可以滚动升级,从而不影响调度
负责存储azkaban系统的数据,包括用户上传的工作流文件、作业执行的日志等。executor server和web server都通过jdbc频繁地对其操作。
azkaban 元数据库
表名 | 描述 |
active_executing_flows | 记录当前执行中的flow对应每次执行的exejid |
execution_flows | azkaban flow的执行记录 |
executionjobs | azkaban flow中的job的执行记录 |
executionjogs | azkaban flow中的执行日志记录 |
executors | 配置的executor的信息,多执行器模式有多条记录 |
project_files | 保存项目的文件 |
project_flows | 项目中的flow信息 |
project_permissions | 项目中用户的权限 |
project_versions | 项目的版本,上传用户,上传时间等 |
projects | 项目信息 |
triggers | 调度信息 |
在3.0版本之后,我们提供了两种模式:独立的“单独服务器”模式和分布式多执行器模式。下面介绍两种模式的区别。
- solo server mode(单机模式):该模式中web server和executor server运行在同一个进程中,进程名azkabansingleserver。可以使用自带的h2数据库或者配置mysql数据。该模式适用于小规模的使用。
- multiple executor mode(多执行器模式),适用于大规模的生产环境。它的数据库应该由设置了主从关系的mysql实例进行备份。web server和executor server运行在不同的进程中,这样升级和维护就不会影响到用户。这种多主机设置为阿兹卡班带来了健壮和可伸缩的方面。
目前我们采用的是multiple executor mode方式,分别在不同的主机上部署多个azkaban executorserver以应对高并发定时任务执行的情况,从而减轻单个服务器的压力。
安装步骤如下:
- 设置数据库
- 配置数据库以使用多个执行程序
- 为数据库中配置的每个executor下载并安装executor server
- azkaban安装插件
- 安装web服务器
官方文档:https://azkaban.readthedocs.io/en/latest/getstarted.html
$ cd /opt/bigdata/hadoop/software$ git clone https://github.com/azkaban/azkaban.git
$ cd azkaban; ./gradlew build installdist
【温馨提示】如果编译失败了,就多执行几次
$ cd azkaban-solo-server/build/install/azkaban-solo-server$ ./bin/start-solo.sh$ netstat -tnlp|grep 8081### 停止服务,这里不执行$ ./bin/shutdown-solo.sh
检查进程
$ jps
- 访问http://ip:8081 (默认端口是8081)
- zkaban默认登录名/密码:azkaban/azkaban
访问:http://192.168.0.113:8081
阿兹卡班独立服务器默认不使用ssl。但你也可以在独立的web服务器上用同样的方法设置它。azkaban web服务器支持ssl套接字连接器,这意味着必须提供密钥存储库,您可以按照以下步骤生成这里提供的有效的jetty密钥存储库:
创建ssl配置
$ keytool -keystore keystore -alias jetty -genkey -keyalg rsa1.输入密钥库口令: 1234562.再次输入新口令: 1234563.[unknown]: azkaban4.[unknown]: azkaban5.[unknown]: azkaban6.[unknown]: shenzhen7.[unknown]: guangdong8.[unknown]: cn9.[no]: y 10.(如果和密钥库口令相同, 按回车):
修改azkaban 配置文件azkaban.properties或azkaban.private.properties(推荐)。
$ cd /opt/bigdata/hadoop/software/azkaban/azkaban-solo-server/build/install/azkaban-solo-server/conf$ touch azkaban.private.properties
在azkaban.private.properties(文件需要创建)配置如下:
# 根据上面设置的填,keystore文件会自动生成# ssl 文件名jetty.keystore=/opt/bigdata/hadoop/software/azkaban/azkaban-solo-server/build/install/azkaban-solo-server/keystorejetty.password=123456jetty.keypassword=123456# 文件名jetty.truststore=/opt/bigdata/hadoop/software/azkaban/azkaban-solo-server/build/install/azkaban-solo-server/keystorejetty.trustpassword=123456
在azkaban.properties修改如下配置:
jetty.use.ssl=truejetty.ssl.port=8443
重启服务
$ ./bin/shutdown-solo.sh ; ./bin/start-solo.sh$ jps$ netstat -tnlp|grep 8443
web访问验证
https://192.168.0.113:8443
先停掉上面的服务
$ /opt/bigdata/hadoop/software/azkaban/azkaban-solo-server/build/install/azkaban-solo-server/bin/shutdown-solo.sh
部署规划
hostname | ip | 节点属性 |
hadoop-node1 | 192.168.0.113 | azkaban web server/azkaban executor server |
hadoop-node2 | 192.168.0.114 | azkaban executor server |
这里也提供一下我编译的安装包,下载地址如下:
链接:https://pan.baidu.com/s/1zvuyfxg3scpqbfeswy-olq
提取码:6666
$ mkdir /opt/bigdata/hadoop/server/azkaban$ cd /opt/bigdata/hadoop/software/azkaban/$ cp ./azkaban-web-server/build/distributions/azkaban-web-server-3.91.0-313-gadb56414.tar.gz /opt/bigdata/hadoop/server/azkaban/$ cp ./azkaban-exec-server/build/distributions/azkaban-exec-server-3.91.0-313-gadb56414.tar.gz /opt/bigdata/hadoop/server/azkaban/$ cp ./azkaban-db/build/distributions/azkaban-db-3.91.0-313-gadb56414.tar.gz /opt/bigdata/hadoop/server/azkaban/### 解压并改名$ cd /opt/bigdata/hadoop/server/azkaban
因为我之前安装过了mysql,不清楚的可以参考我之前的文章:
#【温馨提示】一般公司禁止mysql -u root -p123456这种方式连接,在history里有记录,存在安全隐患,小伙伴不要被公司安全审计哦,切记!!!$ mysql -u root -p输入密码:123456create database azkaban;create user 'azkaban'@'%' identified by 'azkaban';grant select,insert,update,delete on azkaban.* to 'azkaban'@'%' with grant option;
mysql数据包大小可能需要重新配置。默认情况下,mysql允许的包大小可能低得离谱。要增加它,您需要将属性max_allowed_packet设置为一个更大的数字,比如1024m。要在linux中配置,请打开/etc/my.cnf或者/etc/my.cnf.d/mysql-server.cnf(推荐),在mysqld后面的某个地方,添加以下内容:
[mysqld]max_allowed_packet=1024m
重启mysql服务
$ systemctl restart mysqld$ netstat -tnlp|grep 3306
开始初始化azkaban表
$ cd /opt/bigdata/hadoop/server/azkaban/azkaban-db# 连接mysql$ mysql -u root -p密码:123456use azkaban# 可能版本不一样,sql文件也不太一样,create-all-sql-*.sqlsource create-all-sql-3.91.0-313-gadb56414.sql
$ cd /opt/bigdata/hadoop/server/azkaban/azkaban-exec# mysql配置,如果不一样,就需要调整$ grep mysql conf/azkaban.properties
修改conf/azkaban.properties配置文件
### 修改时区default.timezone.id=asia/shanghai### 修改mysql hostmysql.host=hadoop-node1### webserver.urlazkaban.webserver.url=https://hadoop-node1:8443### executor.port不设置就是随机值了,不方便管理,所以这里还是固定一个端口号,看资料大部分都是使用12321这个端口,这里也随大流executor.port=12321
启动服务
# 【温馨提示】必须进入到azkaban-exec目录下执行启动重启命令,因为配置文件中有些路径用的是相对路径$ cd /opt/bigdata/hadoop/server/azkaban/azkaban-exec# 重启$ ./bin/shutdown-exec.sh ; ./bin/start-exec.sh# azkabanexecutorserver$ jps$ telnet -tnlp|grep 12321
在数据库中查看记录
通过接口的方式去激活,不能直接改表字段值,切记!!!
# 记得换成自己的ip或域名$ curl -g "hadoop-node1:12321/executor?action=activate" && echo
【温馨提示】重启azkaban executor server得重新激活
- 【第一步】先登录hadoop-node2创建azkaban目录
$ mkdir -p /opt/bigdata/hadoop/server/azkaban
- 【第二步】登录到hadoop-node1 copy 安装目录到hadoop-node2
$ cd /opt/bigdata/hadoop/server/azkaban$ scp -r azkaban-exec hadoop-node2:/opt/bigdata/hadoop/server/azkaban/
- 【第三步】启动executor server
# 登录到hadoop-node2 切换到azkaban目录$ cd /opt/bigdata/hadoop/server/azkaban/azkaban-exec$ ./bin/start-exec.sh$ jps$ netstat -tnlp|grep 12321
- 【第四步】激活executor server
# 记得换成自己的ip或域名$ curl -g "hadoop-node2:12321/executor?action=activate" && echo
【温馨提示】重启azkaban executor server得重新激活
$ cd /opt/bigdata/hadoop/server/azkaban/azkaban-web# mysql配置,如果不一样,就需要调整$ grep mysql conf/azkaban.properties
修改conf/azkaban.properties配置文件
### 修改时区default.timezone.id=asia/shanghai### 修改mysql hostmysql.host=hadoop-node1### azkaban.executorselector.filters调度策略# 把minimumfreememory去掉,因为minimumfreememory是6g,自己电脑资源有限,如果小伙伴的机器资源雄厚,可以保留# staticremainingflowsize:根据排队的任务数来调度任务到哪台executor机器# cpustatus:跟据cpu空闲状态来调度任务到哪台executor机器azkaban.executorselector.filters=staticremainingflowsize,cpustatus
启动服务
$ ./bin/start-web.sh$ jps$ netstat -tnlp|grep 8081
跟上面的一样
$ keytool -keystore keystore -alias jetty -genkey -keyalg rsa1.输入密钥库口令: 1234562.再次输入新口令: 1234563.[unknown]: azkaban4.[unknown]: azkaban5.[unknown]: azkaban6.[unknown]: shenzhen7.[unknown]: guangdong8.[unknown]: cn9.[no]: y 10.(如果和密钥库口令相同, 按回车):
配置
在azkaban-web/azkaban.private.properties(文件需要创建)配置如下:
# 根据上面设置的填,keystore文件会自动生成# ssl 文件名jetty.keystore=/opt/bigdata/hadoop/server/azkaban/azkaban-web/keystorejetty.password=123456jetty.keypassword=123456# 文件名jetty.truststore=/opt/bigdata/hadoop/server/azkaban/azkaban-web/keystorejetty.trustpassword=123456
在azkaban-web/azkaban.properties修改如下配置:
jetty.use.ssl=truejetty.ssl.port=8443
修改azkaban-exec/conf/azkaban.properties
jetty.port=8443# where the azkaban web server is locatedazkaban.webserver.url=https://hadoop-node1:8443
重启服务
$ ./bin/shutdown-web.sh ; ./bin/start-web.sh$ jps$ netstat -tnlp|grep 8443
web访问验证:https://192.168.0.113:8443
官方文档:https://azkaban.readthedocs.io/en/latest/usermanager.html
$ cd /opt/bigdata/hadoop/server/azkaban/azkaban-web$ cat conf/azkaban-users.xml
配置一个管理员用户,增加如下一行
重启web服务
$ ./bin/shutdown-web.sh ; ./bin/start-web.sh
https://hadoop-node1:8443
官方文档:https://azkaban.readthedocs.io/en/latest/createflows.html
1、在windows环境,新建helloworld.project文件,编辑内容如下:
azkaban-flow-version: 2.0
【温馨提示】该文件作用,是采用新的flow-api方式解析flow文件,内容基本上是固定的,2.0版本xxx.flow是yaml格式,2.0之前的版本是key=value格式,示例如下:
type=commandcommand=echo 'hello'
2、新建helloworld.flow文件,内容如下:
nodes: - name: joba type: command config: command: echo "hello world"
【温馨提示】注意缩进的空格
- name:job名称
- type:job类型。command表示你要执行作业的方式为命令
- config:job配置
3、将上面两个文件压缩成一个zip文件,目前只支持zip文件,文件名称必须是英文。
4、新建project
5、把zip文件上传到azkaban执行
6、执行
- dependentworkflow.flow
nodes: - name: joba type: command config: command: echo "joba" - name: jobb type: command config: command: echo "jobb" - name: jobc type: command dependson: - joba - jobb config: command: echo "jobc"
- dependentworkflow.project
azkaban-flow-version: 2.0
- 创建project,并把zip文件上传到azkaban执行
- 执行(执行完joba和jobb才执行jobc)
执行一个不存在的脚本/tmp/retry.sh,则任务失败,间隔10000ms,重试3次,其实总共会执行4次,1 3(重试3次)
- autofailed2retry.flow
---nodes: - name: joba type: command config: command: sh /tmp/retry.sh retries: 3 retry.backoff: 10000
retries:重试次数
retry.backoff:重试的间隔时间(ms)
- autofailed2retry.project
azkaban-flow-version: 2.0
- 创建project,并把zip文件上传到azkaban执行
- 执行
执行了4次,最后一次执行的状态为失败状态
需求:joba=》jobb(依赖于a)=》jobc(依赖于b)=》jobd(依赖于c)=》jobe(依赖于d)=》jobf(依赖于e)。生产环境,任何job都可能挂掉,可以根据需求执行想要执行的job。这里假设jobc失败了。
- manualfailed2retry.flow
---nodes: - name: joba type: command config: command: echo "this is joba." - name: jobb type: command dependson: - joba config: command: echo "this is jobb." - name: jobc type: command dependson: - jobb config: command: sh /tmp/retry.sh - name: jobd type: command dependson: - jobc config: command: echo "this is jobd." - name: jobe type: command dependson: - jobd config: command: echo "this is jobe." - name: jobf type: command dependson: - jobe config: command: echo "this is jobf."
- manualfailed2retry.project
azkaban-flow-version: 2.0
- 创建project,并把zip文件上传到azkaban执行
- 执行
执行到jobc失败了,后面的job就会自动取消了
- 手动创建这个/tmp/retry.sh脚本,每个executor都创建这个脚本,因为不确定会调度到哪个executor
$ echo "echo 'this is jobc.'" > /tmp/retry.sh
可以看到,之前joba和jobb执行成功的就不再执行了。正是预期效果。
【温馨提示】type不单单只有command,还有javaprocess,当然还有其它type,例如:noop等。可以参考官方文档
【概述】
javaprocess类型可以运行一个自定义主类方法,type类型为javaprocess,可用的配置为:
- xms:最小堆内存
- xmx:最大堆内存
- classpath:类路径,可以省略,省略的话,是flow当前文件路径
- java.class:要运行的java对象,其中必须包含main方法
- main.args:main方法的参数
- 新建azkaban的maven工程或者module
- 创建com.bigdata.aztest类,内容如下:
【示例】
package com.bigdata;public class aztest { public static void main(string[] args) { system.out.println("this is azkaban test!!!"); }}
- 打包成jar包azkaban-1.0-snapshot.jar
- 新建com.bigdata.testjava.flow,内容如下:
nodes: - name: az_javaprocess_test type: javaprocess config: xms: 100m xmx: 200m java.class: com.bigdata.aztest
- javaprocesstest001.project,project文件是固定的也是并不可少的。
azkaban-flow-version: 2.0
- 把三个文件打包成zip包
- 把zip包上传到azkaban上执行
条件工作流功能允许用户自定义执行条件来决定是否运行某些job,条件可以由当前job的父job输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定job执行逻辑时获取得更大的灵活性,例如:只要父job之一成功,就可以运行当前job。
- 父job将参数写入job_output_prop_file环境变量所指向的文件
- 子job使用${jobname.param}来获取父job输出的参数并定义执行条件
- == 等于
- != 不等于
- 大于
- = 大于等于
- < 小于
- <= 小于等于
- && 与
- || 或
- ! 非
【示例一】
需求:joba执行一个shell脚本。job执行一个shell脚本,但jobb不需要每天都执行,而只需要每周一执行。
- 新建joba.sh
#!/bin/bashecho "do joba"wk=`date %w`echo "{"wk":$wk}" > $job_output_prop_file
获取当前周第几天,0:周日,1表示周一,则jobb需要到周一才执行,今天不执行
- 新建jobb.sh
#!/bin/bashecho "do job"
- 新建condition.flow
nodes: - name: joba type: command config: command: sh joba.sh - name: jobb type: command dependson: - joba config: command: sh jobb.sh condition: ${joba:wk} == 1
- 将joba.sh、jobb.sh、condition.flow和azkaban.project打包成xxx.zip
- 创建condition项目=》上传xxx.zip文件=》执行作业=》观察结果
azkaban中预置了几个特殊的判断条件,称为预定于宏。
预定于宏会根据所有父job的完成情况进行判断,再决定是否执行。可用的预定义宏如下:
- all_success:表示父job全部成功才执行(默认)
- all_done:表示父job全部完成才执行
- all_failed:表示父job全部失败才执行
- one_success:表示父job至少一个成功才执行
- one_failed:表示父job至少一个失败才执行
需求:
- joba执行一个shell脚本
- jobb执行一个shell脚本
- jobc执行一个shell脚本,要求joba、jobb中有一个成功即可执行
步骤如下:
- 新建joba.sh
#!/bin/bashecho "do joba"
- 新建jobb.sh(不创建,验证)
#!/bin/bashecho "do jobb"
- 新建jobc.sh
#!/bin/bashecho "do jobc"
- 新建marco.flow
nodes: - name: joba type: command config: command: sh joba.sh - name: jobb type: command config: command: sh jobb.sh - name: jobc type: command dependson: - joba - jobb config: command: sh jobc.sh condition: one_success
- 新建marco.project
azkaban-flow-version: 2.0
- 把文件打包成zip文件,并上传到azkaban上执行
- 执行
jobb执行失败了,jobc还是执行成功,验证成功。
拿之前的案例,设置定时调度
每分钟执行一次
把定时任务移除
这里以qq邮箱为示范,需要开启邮箱协议
$ cd /opt/bigdata/hadoop/server/azkaban/azkaban-web$ vi conf/azkaban.properties# 修改的内容如下:mail.sender=2920992033@qq.commail.host=smtp.qq.com# 下面两行没有需要增加mail.user=2920992033@qq.com# password就换成上面截图的授权码(自己邮箱授权码)mail.password=xxxx
重启web服务生效
$ ./bin/shutdown-web.sh ; ./bin/start-web.sh
还是拿上面的案例进行测试
查看邮箱,发现已经收到了工作流报警邮件
因为azkaban默认是不支持电话报警的,所以这里使用睿象云做中转实现电话报警。
这里选择免费试用,自己测试就免费试用也够了,如果是商用或者在企业里用肯定是需要付费使用的。
使用智能告警平台
选择集成
这里使用email集成方式,因为azkaban支持邮件
配置这个邮箱:881841810azkaban@camail.aiops.com
能收到告警电话,验证ok。小伙伴可以自己试试。
总结
原生的 azkaban 支持的plugin类型有以下这些:
- command:linux shell命令行任务
- javaprocess:原生java任务
- gobblin:通用数据采集工具
- hadoopjava:运行hadoopmr任务
- hive:支持执行hivesql
- pig:pig脚本任务
- spark:spark任务
- hdfstoteradata:把数据从hdfs导入teradata
- teradatatohdfs:把数据从teradata导入hdfs
上面我们示例中用到了command和javaprocess,其中最简单而且最常用的是command类型。
azkaban基础部分就先到这了,后续会有更多相关的文章,请小伙伴耐心等待~
ag凯发k8国际的版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。