python通过dataX同步mysql数据

1、介绍

该脚本主要是使用python脚本通过dataX查询mysql的数据增量同步至HIVE。

2、环境

1、python3 
2、dataX
3、pyhive
4、pyspark

3、功能

(1)支持测试和生产的自由切换。
(2)支持增量同步。
(3)支持补历史数据。
(4)运行环境简单。
(5)支持HIVE队列的切换。

4、优化

对于同步数据,该脚本基本已经都支持。还有优化空间就是:
	1、连接HIVE时可以使用HA模式,不连接单节点。减少宕机风险。
	2、日志打印的规范性。
	3、dataX脚本的缺失。由于其他原因不能放出来。我后面见放一个demo出来。
	4、可以通过变量的形式将要同步的表目标表传进来。实现举一反三的同步。
后续在其他脚本优化。

5、源码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 增量同步消息
from pyhive import hive
import os, sys,datetime

isPrd = True
hiveInfo = {'host':'192.168.1.1','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'} \
	if(isPrd)  else {'host':'192.168.1.122','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'}
sourceDbInfo = {'url':'192.168.1.1:3306/db','user':'root','passwd':'123'} \
	if(isPrd)  else {'url':'192.168.1.122:3306/db','user':'root','passwd':'root123'}
sys.path.append(os.getcwd())
UTF8 = "UTF-8";
class HiveClient:
    def __init__(self):
        self.conn = hive.connect(
                        host=hiveInfo.get('host'),
                        port=hiveInfo.get('port'),
                        username=hiveInfo.get('user'),
                        database=hiveInfo.get('database'),)

    def query(self, sql):
        sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
        print(sql)
        with self.conn.cursor() as cursor:
            cursor.execute("set mapreduce.job.queuename=root.users.project")
            cursor.execute(sql)
            return cursor.fetchall()

    def execute(self, sql):
        sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
        print(sql)
        with self.conn.cursor() as cursor:
            cursor.execute("set mapreduce.job.queuename=root.users.project")
            cursor.execute(sql)

    def close(self):
        self.conn.close()




def __getMaxPk():
#增加分区
    addPartion="alter table ods.ods_message_incr add if not exists partition (dt='{dt}') ".format(dt=dt)
    HiveClient().execute(addPartion)
#获取最大ID
    sql = """select max(id) from ods.ods_message_incr where dt='{dt}'""".format(dt=dt)
    data = HiveClient().query(sql)
    HiveClient().close()

    print(data)
    if (data[0][0] == None):
        return 0
    return data[0][0]


# 增量同步推送消息
def syncPushMessage(dt):
    maxPk = __getMaxPk();
    datax_json_path = os.getcwd() + '/ods_message_incr.json'
    etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
    # 这是执行dataX 命令,后面是传参。
    commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s  -Dhdfs=%s '" % (
    datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
    print(commandStr)
    os.system(commandStr)

# 补充缺失消息
def syncPushMessage_history(dt,maxPk):
    datax_json_path = os.getcwd() + '/ods_message_incr.json'
    etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
    commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % (
    datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
    print(commandStr)
    os.system(commandStr)

if __name__ == '__main__':
    if len(sys.argv) == 1:
        dt = (datetime.datetime.now()).strftime('%Y-%m-%d')
        syncPushMessage(dt)
    elif len(sys.argv) == 2:
        dt = sys.argv[1]
        syncPushMessage(dt)
    elif len(sys.argv) == 3:
        dt = sys.argv[1]
        maxPk = sys.argv[2]
        syncPushMessage_history(dt, maxPk)
    else:
        print('参数输入错误')
        sys.exit(1)

6、最后

文章为原创,转载请出示原地址。
感谢你的阅读,如果这篇文章能帮到你。是我的荣幸!谢谢~

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐