使用:

python rcdasebase_moe.py data_collection_base_00_db t_base_info_0
python rcdasebase_moe.py data_collection_base_00_db t_base_info_1
... ...

python rcdasebase_moe.py data_collection_base_99_db t_base_info_8
python rcdasebase_moe.py data_collection_base_99_db t_base_info_9

代码:

#!/bin/env python

import MySQLdb,MySQLdb.cursors
import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import sys

# start_time = datetime.datetime.strptime('2021-01-01', '%Y-%m-%d')
# stop_time = datetime.datetime.strptime('2021-07-01', '%Y-%m-%d')
# stop_time = datetime.datetime.strptime('2021-01-01', '%Y-%m-%d')
start_day = '2021-07-01'
stop_day = '2021-10-01'


# db_name = 'data_collection_base_99_db'
# table_name = 't_base_info_0'
db_name = sys.argv[1]
table_name = sys.argv[2]
index_name = 'af_collect_data_2021q3_new'

if not db_name.startswith('data_collection_base'):
    sys.exit(1)
if not table_name.startswith('t_base_info'):
    sys.exit(1)


def mysql2es():
    """
    读出 MySQL 数据,直接写入 ES
    """
    conn = MySQLdb.connect(host='MySQL_IP', port=MySQL_PORT, user='MySQL_USER', passwd='MySQL_PWD', charset='utf8')
    cur = conn.cursor()
    es_client = Elasticsearch(["ES_IP_1", "ES_IP_2"], port=ES_PORT, timeout=180)

    count = 0
    while True:
    
        f_day = (datetime.datetime.strptime(start_day, '%Y-%m-%d') + datetime.timedelta(days=count)).strftime("%Y-%m-%d")
        e_day = (datetime.datetime.strptime(start_day, '%Y-%m-%d') + datetime.timedelta(days=count+1)).strftime("%Y-%m-%d")
        print db_name, table_name, f_day

        sql = """
            select 
                concat(fuid, '_', Fscene_type,  '_', Fchannel_id , '_',  unix_timestamp(Fcreate_time)*1000 )  as id,
                Fuid as uid,
                Forder_id as orderId,
                Fscene_type as sceneType,
                Fchannel_id as channelId,
                Fchannel_name as channelName,
                Fapp_system as appSystem,
                Fdevice_name as deviceName,
                Fequipment_model as equipmentModel,
                Fidfa as idfa,
                Fimei as imei,
                Fis_agent as isAgent,
                Flatitude as latitude,
                Flongitude as longitude,
                Fgeohash as geohash,
                Flocal_qq as localQq,
                Flocal_tel as localTel,
                Fmac_code as macCode,
                Fnetwork_type as networkType,
                Fphone_mac_address as phoneMacAddress,
                Fsource as source,
                Fwater_num as waterNum,
                Fwifi_mac_address as wifiMacAddress,
                Fwifi_name as wifiName,
                Fip_address as ipAddress,
                Fcreate_time as createTime,
                Fcreate_time as actionTime
            from %s.%s 
            where Fmodify_time >='%s 00:00:00' and Fmodify_time < '%s 00:00:00';""" % (db_name, table_name, f_day, e_day)
        cur.execute(sql)
        result = cur.fetchall()
        
        # 上面是把数据从 MySQL 读出,存到 result 中。下面把 result 写到 ES 中。
        actions = []
        for i in result:
            try:
                c_time = i[25].strftime('%Y-%m-%d %H:%M:%S')
                a_time = i[26].strftime('%Y-%m-%d %H:%M:%S')
            except Exception as e:
                print e
                print i
                continue

        action =  {
            "_index" : index_name,
            "_type" : "collect_data_base_info",
            "_id" : i[0],
            "_source" : {
                      "uid" : i[1],
                      "orderId" : i[2],
                      "sceneType" : i[3],
                      "channelId" : i[4],
                      "channelName" : i[5],
                      "appSystem" : i[6],
                      "deviceName" : i[7],
                      "equipmentModel" : i[8],
                      "idfa" : i[9],
                      "imei" : i[10],
                      "isAgent" : i[11],
                      "latitude" : i[12],
                      "longitude" : i[13],
                      "geohash" : i[14],
                      "localQq" : i[15],
                      "localTel" : i[16],
                      "macCode" : i[17],
                      "networkType" : i[18],
                      "phoneMacAddress" : i[19],
                      "source" : i[20],
                      "waterNum" : i[21],
                      "wifiMacAddress" : i[22],
                      "wifiName" : i[23],
                      "ipAddress" : i[24],
                      "createTime" : c_time,
                      "actionTime" : a_time
                    }
        }
        actions.append(action)

        while True:
            try:
                helpers.bulk(es_client, actions)
                writef = 0
            except Exception as e:
                print e
                writef = 1
            if writef == 0:
                break 
        if e_day == stop_day:
            break

        count += 1


mysql2es()

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐