DataX + xxl-job
1、部署xx-job官网:https://www.xuxueli.com/xxl-job/里面由详细的讲解我用的版本:安装docker mysqldocker pull mysql:8.0.19docker run -itd --name mysql-xxljob -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysqlmysql -u root -h 12
目录
1、xxl-job
1.1 部署
官网:https://www.xuxueli.com/xxl-job/ 里面由详细的讲解
我用的版本:
安装docker mysql
docker pull mysql:8.0.19
docker run -itd --name mysql-xxljob -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysql
mysql -u root -h 127.0.0.1 -P3306 -p
进行授权远程连接(注意mysql 8.0跟之前的授权方式不同)
mysql脚本: https://gitee.com/xuxueli0323/xxl-job/tree/2.2.0/doc/db/tables_xxl_job.sql 拷贝sql放入mysql中执行。
docker pull xuxueli/xxl-job-admin:2.2.0
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.10.11:3306/xxl_job?Unicode=true&characterEncoding=UTF-8&useSSL=false --spring.datasource.username=root --spring.datasource.password=123456 --xxl.admin.login=false" -p 9080:8080 --name xxl-job-admin -d 6a35dd59e569
登录:http://192.168.10.11:9080/xxl-job-admin/toLogin admin/123456
1.2 集群部署
调度中心集群部署时,几点要求和建议:
- DB配置保持一致;
- 集群机器时钟保持一致(单机集群忽视);
- 建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。
当执行器集群部署时,提供丰富的路由策略,包括;
1.3 执行器编写-java自动
step1:
执行器可理解为客户端,很简单:从gitee 上下载 xxl-job-executor-samples,然后配置:
xxl.job.admin.addresses=http://192.168.10.11:9080//xxl-job-admin
.......
xxl.job.executor.appname=xxl-job-executor-steven
调度器中:
启动执行器,就能在调度器中看到该执行器上线了:
step2:
在执行器中写调度任务:
然后在调度器中设置:
直接就可以测试执行了。
1.4 执行器编写-脚本变成
先了解一下运行的原理:
(1)脚本任务的源码托管在调度中心,脚本逻辑在执行器运行。当触发脚本任务时,执行器会加载脚本源码在执行器机器上生成一份脚本文件,然后通过Java代码调用该脚本;并且实时将脚本输出日志写到任务日志文件中,从而在调度中心可以实时监控脚本运行情况。
(2)需要注意的是,即便你只想调度shell,执行器还是要的,可以参考xxl-job-executor-sample-springboot写一个简单的执行器。
以powershell脚本为例:
运行,在/data/applogs/xxl-job/jobhandler/gluesource路径下,看到其生成了8_1606897999000.ps1文件,打开发现就是我们编写的glue脚本:
同时运行的日志在:\data\applogs\xxl-job\jobhandler\2020-12-02 可以看到。
可以发现当前盘符下确实创建了一个D://asd
1.5 父子任务
很简单,直接上图:
1.6 二次开发
第三方系统能够进行对接口进行二次开发:
(1)老版本-仅作参考
参考:https://github.com/deane163/xxl-job/
(2)新版本
参考:xxl-job通过代码的方式动态添加任务,修改任务,执行任务,停止任务
但是有个问题就是,这些接口都是后台使用的,要想调用,就必须得登录才行,不登录是没有办法访问的。那怎么办?难道还要模拟登录一次,其实大可不必,因为xxl-job中已经为我们提供了一个口子,就是一个注解和一个拦截器,通过这个注解可以配置接口是否可以跳过登录进行访问。也就是我们可以把接口配置成跳过登录验证进行访问就可以了,这样我们就能不登录而进行请求 。这个注解就是 @PermissionLimit(limit = false) ,默认是true,也就是需要做登录验证。
代码位置:
2、datax
实现很简单
系统要求:
jdk 1.8
python 2.6.x
dataX github:https://github.com/alibaba/DataX
启动dataX:
$ cd {YOUR_DATAX_DIR_BIN}
$ python datax.py ./xxx.json
mysql to es json模板:
(1)读取table-column方式:
{
"job": {
"setting": {
"speed": {
"channel": 1,
"record": -1,
"byte": -1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"ah",
"updatetime"
],
"splitPk": "id",
"connection": [{
"table": [
"yaosu_mjjd_newmodel"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/data"
]
}]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://192.1688.10:6003",
"accessId": "elastic",
"accessKey": "123456",
"index": "mysql_aj_es",
"type": "_doc",
"cleanup": true,
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "id", "type": "id"},
{"name": "ah", "type": "string"},
{"name": "updatetime", "type": "text"}
]
}
}
}]
}
}
(2)自定义sql方式
{
"job": {
"setting": {
"speed": {
"channel": 1,
"record": -1,
"byte": -1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [{
"querySql": [
"select id,title,updatetime from ss_all"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/data"
]
}]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://192.168.10.11:6003",
"accessId": "elastic",
"accessKey": "123456",
"index": "mysql_ss_es",
"type": "_doc",
"cleanup": true,
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "id", "type": "id"},
{"name": "title", "type": "text"},
{"name": "updatetime", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
]
}
}
}]
}
}
3、xxl-job和datax整合
两种方法
(1)xxl-job 使用 GLUE IDE 调用 python datax.py ./xxx.json
(2)使用Datax的开发包,实现 Engine.entry(args);
参考:https://blog.csdn.net/pharos/article/details/104147381
更多推荐
所有评论(0)