datax 同步mongodb数据库到hive(hdfs)和elasticserch(es)
主要解决datax连接mongodb出现Unauthorized问题,和datax增量同步mongodb问题,以及充分利用hive动态分区实现批量同步
一、同步环境
1.mongodb版本:3.6.3。(有点老了,后来发现flinkcdc都只能监控一张表,多张表无法监控)
2.datax版本:自己编译的DataX-datax_v202210
3.hdfs版本:3.1.3
4.hive版本:3.1.2
二、同步思路
1.增量数据:需要每隔1小时将mongodb中17个集合的数据同步至hive,因为有数据生成时间,才用datax查询方式,将上一个小时的数据依次循环调用datax同步至hdfs,利用shell脚本和调度器定时装载至hive中形成ods层,并和其他表关联处理形成dwd层,提供给需求方。
2.全量数据:历史数据才用datax编写脚本循环读取+调度+hive动态分区方式同步至hive。因为hive动态分区默认只支持100个分区,我是按小时进行分区的,因此我每次只拉取4天数据,拉取太多报错,编写脚本,需要多少天,拉取多少天。(比较笨的方法,有更好的方式欢迎评论区讨论)
三、datax配置
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["xxxxxxxx:27017"],
"authDb": "admin",
"userName": "xxxxx",
"userPassword": "xxxx",
"dbName": "xxxx",
"collectionName": "xxxx",
"column": [
{
"name": "_id",
"type": "string"
},
{
"name": "data",
"type": "string"
},
{
"name": "gid",
"type": "string"
},
{
"name": "text",
"type": "string"
},
{
"name": "time",
"type": "bigint"
},
{
"name": "uid",
"type": "string"
}
],
"query":"{\"time\":{ \"$gte\": ${start_time}, \"$lt\": ${end_time}}}"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "ask_id",
"type": "string"
},
{
"name": "data",
"type": "string"
},
{
"name": "gid",
"type": "string"
},
{
"name": "text",
"type": "string"
},
{
"name": "time",
"type": "string"
},
{
"name": "uid",
"type": "string"
}
],
"compress": "gzip",
"defaultFS": "xxxx:8020",
"fieldDelimiter": "\t",
"fileName": "xxxx",
"fileType": "text",
"path": "${targetdir}",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
这里面有两个坑。
第一个:datax连接mongodb一定注意"authDb": “admin”,这个配置,要明确同步账号认证库的位置,账号在那个库里面认证的就写哪个库,由于mongodb每个库是单独认证的,一直报:
com.alibaba.datax.common.exception.DataXException: Code:[Framework-02], Description:[DataX引擎运行过程出错,具体原因请参看DataX运行结束时的错误诊断信息 .]. - com.mongodb.MongoCommandException: Command failed with error 13: 'command count requires authentication' on server xxx:27117. The full response is { "ok" : 0.0, "errmsg" : "command count requires authentication", "code" : 13, "codeName" : "Unauthorized" }
找过很多资料,两种方式解决账号认证问题。一种是,刚才提到的指明账号认证库;第二种,就是同步哪个库,单独给这个账号再授权一遍库的权限,代码如下:
db.createUser({user:"x x
x x x",pwd:"xxxxxx",roles:[{"role":"read","db":"xxxx"}]})
查询同步不需要太高的权限,read即可
第二坑:mongodb的query查询,用的是json语句,网上有大神分享的源码分析,里面的查询条件是“and”语句,也就是说,用逗号分隔的查询条件是and,想用or要多次查询(但是我测试十几也不全是and,好像是同样的字段以最后一条为准,留着后面再研究班),哎,没办法,谁让我懒得自己写代码,凑合着用吧。分享query查询语句多个条件的用法:
"query":"{\"time\":{ \"$gte\": 1646064000, \"$lte\": 1648742399},\"time\":{ \"$gte\": 1654012800, \"$lte\": 1656604799},\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"
四、datax同步调度脚本
#!/bin/bash
# 定义变量方便修改
APP=xxx
TABLE=xxx
DATAX_HOME=xxxx
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一小时
do_date=2022111416
hr1=${do_date: 8: 2}
date1=${do_date: 0: 8}
hdfs_path=xxx
#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
hadoop fs -test -e $hdfs_path
if [[ $? -eq 1 ]]; then
echo "路径 $hdfs_path 不存在,正在创建......"
hadoop fs -mkdir -p $hdfs_path
else
echo "路径 $hdfs_path 已经存在"
fs_count=$(hadoop fs -count $hdfs_path)
content_size=$(echo $fs_count | awk '{print $3}')
if [[ $content_size -eq 0 ]]; then
echo "路径$hdfs_path为空"
else
echo "路径$hdfs_path不为空,正在清空......"
hadoop fs -rm -r -f $hdfs_path/*
fi
fi
#数据同步
for i in xxx xxx xxx
do
echo ================== $i 装载日期为 $do_date ==================
python $DATAX_HOME/bin/datax.py -p"-Dcollection=$i -Dtargetdir=$hdfs_path" $DATAX_HOME/xxx
done
五、datax同步至es 配置
mongodb同步至es有一个专用的组件,monstache;知道,但还没用过,留白,由于时间紧张用的datax,此处三个注意点:
1.object格式可以datax读取的时候可用string,导入es再改回object
2.es重名没问题
3.想用es中文分词统计词频,除了要配置中文ik,也需要filedata=true;
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["xxxx:27017"],
"userName": "xxx",
"authDb": "xxx",
"userPassword": "xxxx",
"dbName": "xxxx",
"collectionName": "${collection}",
"column": [
{
"name": "_id",
"type": "string" #原有格式为objectid,用此处用string
},
{
"name": "data",
"type": "string" #原有格式为list(object),用string可以倒进去
},
{
"name": "gid",
"type": "string"
},
{
"name": "text",
"type": "string"
},
{
"name": "time",
"type": "bigint"
},
{
"name": "uid",
"type": "string"
},
{
"name": "deleted",
"type": "bigint"
}
],
"query":"{\"time\":{ \"$gte\": 1661961600, \"$lte\": 1664553599}}"
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "xxxxxx:9200",
"index": "xxxx",
"type": "xxxx",
"cleanup": false,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 2048,
"splitter": ",",
"column": [
{
"name": "_id",
"type": "id"
},
{
"name": "data",
"type": "object" #源数据为object,此处也为object
},
{
"name": "gid",
"type": "keyword"
},
{
"name": "text",#即使和关键词重名也不影响,挺好
"type": "text","analyzer": "ik_smart"
},#此处想用es分词,来统计词频的小伙伴建议开启filedata:true,不知道能不能用哈,反正我知道不开启,不能用,有兴趣可以研究下,告诉我
{
"name": "time",
"type": "long"
},
{
"name": "uid",
"type": "keyword"
},
{
"name": "deleted",
"type": "long"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 4
}
}
}
}
六、其他问题
其他就比较简单了,懒得记了,后面有问题再补充
更多推荐
所有评论(0)