ETL的开发过程
在生产环境中, 使用shell脚本完成一次etl操作1.定义一个etl函数, 里面传入json行数据, 用json.loads加载行数据,并对行数据进行判断,如果没有行数据,或data字段没有在行数据里, 就直接返回空的结果, 否则就继续往下执行2.接着获取行里的数据, 用for循环判断, 如果包含某个值, 我就将变量赋值取出, 装在集合容器里3.设置sparksession会话, 并ena...
在生产环境中, 使用shell脚本完成一次etl操作
1.定义一个etl函数, 里面传入json行数据, 用json.loads加载行数据,并对行数据进行判断,如果没有行数据,或data字段没有在行数据里, 就直接返回空的结果, 否则就继续往下执行
2.接着获取行里的数据, 用for循环判断, 如果包含某个值, 我就将变量赋值取出, 装在集合容器里
3.设置sparksession会话, 并enableHiveSupport, 我用的是hiveonspark模式,
4.初始化rdd, 从大数据emr集群中(也可能是从实时系统kafka读取数据)加载数据到rdd , 然后用自己自定义的etl解析过滤
5.将rdd转为df, createDateFream()要传两个参数,一个是rdd,一个是schema信息
6.将df创建临时表 createOrReplaceTemView()
7.将临时表表的数据加载到hive表中, 完成整个ETL操作
ETL常用场景:
1.清洗nginx日志信息, 预处理日志文件(每小时将上报的日志拉取到本机,hdfs命令上传集群),并清洗存入hive
2.每小时清洗用户表信息,
3.后处理清洗商户信息,
4.清洗并合并设备状态信息,
5.每小时清洗每日设备分成, 清洗并合并积分流水表信息, 每小时清洗支付宝订单表信息等,
def etl(row_str):
result = []
try:
row = json.loads(row_str)
if(not row) or ('data' not in row):
return result
获取行
base = {}
for r_k in row:
r_v = row[r_k]
if r_k != 'data':
r_k=r_k.lower()
base[r_k]=r_k
print(base)
获取data
for data in row['data']:
base_data = base.copy()
if data:
for d_k in data:
d_v = data[d_k]
if d_k != 'list':
d_k = d_k.lower()
base_data[d_k] = d_v
print(base_data)
获取list
for list_ in data['list']:
if list_:
# print(list_)
list_data = base_data.copy()
# list_data.update(list_)
for l_k in list_:
l_v = list_[l_k]
l_k = l_k.lower()
list_data[l_k] = l_v
# print(list_data)
result += [list_data]
# print(result)
except Exception as e:
print(e)
pass
retuen result
设置会话
spark = SparkSession.builder.appName("程序名" % statdate分区日期)
.enableHiveSupport()
.getOrCreate()
初始化rdd
rawLogRDD = spark.sparkContext.textfile("hdfs://emr-cluster/ld_log")
etl解析
etllogRDD = rawLogRDD.flatMap(etl)
可以进行测试打印
for record in etlLogRDD.collect():
print(record)
将rdd 转为df
sampleDF = spark.sql("select * from dept limit 1")
etlLogSchema = sampleDF.schema
etlLogSchema.__dict__['fields'] = etlLogSchema.__dict__['fields'][:-1]
etlLogSchema.__dict__['names'] = etlLogSchema.__dict__['names'][:-1]
etlLogDF = spark.createDataFrame(etlLogRDD,etlLogSchema)
测试:etlLogDF.printSchema()
etlLogDF.show()
exit()
创建临时表
etl.LogDF.createOrReplaceTmpView("etl_log")
写入分区表
spark.sql("alter table dept drop if exist partition(statdate='%s')" ) % statdate)
spark.sql("insert overwrite table dept partition(statdate='%s') select * from etl_log " % statdate)
更多推荐
所有评论(0)