阅读完本文你将学习到以下内容:

  • 1.杂乱数据的分类
  • 2.os库扫描文件夹
  • 3.利用datetime库自动创建不重复文件夹
  • 4.numpy 、pandas 相关方法

1.背景:

数据分析师日常工作中,常常需要接手数据清洗工作,参与定制数据规范,数据框架,数据使用场景规划等,但由于项目并非一成不变,数据的来源也并非永远来自公司内部,所以清洗数据能力是一名数据分析师能力的体现,这份数据清洗代码本人在工作中接触到外部杂乱数据情况下总结的一份清洗代码,现在整理成思路,以作复盘与分享使用,希望能帮助到看到此文的人。

2.数据介绍

数据来源为公司在其他平台收集到的用户资金数据,
分别有:
人员信息表
账户资金表
账户状态表
账户关联表

1).通过观察各表数据结构,我们发现其中主体数据是账户资金表,其他表格为账户资金表服务
2).将数据表存放到本地文件夹,然后开始清洗数据代码

3.数据清洗思路

3.1 按文件名分类

由于数据存在于不同*.csv 表中,我们需要将数据按名字分成以上四类,在分别合并
于是我定义了一个 def file_groupby_csvName(path_excl) 模块,传入地址的话设计成一个表格参数path_excl为一个excel文件路径,里面放着数据集本地文件夹(path列), 对文件分类,
在这里插入图片描述

分别定义4个数据列表装前面提到的四种文件,与一个异常数据名(fPath_error_list)数据表,用于装非前面4类的文件名
fPath_jymx_list
fPath_zhxx_list
fPath_ryxx_list
fPath_qzcs_list
fPath_error_list


def file_groupby_csvName(path_excl):
    global ddpc
    global path_nameTime
    print("*********【正在文件类型分类模块】*********")
    # 文件分类模块
    p_path = pd.read_excel(path_excl)
    p_path = p_path[p_path['isTrue'] == '是']
    count_path = -1
    for f_path in p_path.path:
        fPath_jymx_list = []
        fPath_zhxx_list = []
        fPath_ryxx_list = []
        fPath_qzcs_list = []
        fPath_error_list = []
        count_path += 1
        ddpc = p_path.NO.iloc[count_path]
        print("---------正在处理第%d批次数据---------" % (count_path))
        print("正在处理:'%s' 下文件" % (f_path))
        for root, dirs, files in os.walk(f_path):
            for name in files:
                if '账户资金表' in name and name[-4:] == '.csv':
                    fPath_jymx_list.append(os.path.join(root, name))
                if '账户信息表' in name and name[-4:] == '.csv':
                    fPath_zhxx_list.append(os.path.join(root, name))
                if '账户关联表' in name and name[-4:] == '.csv':
                    fPath_ryxx_list.append(os.path.join(root, name))
                if '账户状态表' in name and name[-4:] == '.csv':
                    fPath_qzcs_list.append(os.path.join(root, name))
                elif  name[-4:] == '.csv':
                    fPath_error_list.append(os.path.join(root, name))
        print("*********' %s'分类完毕' *********" % (f_path))
        concat_list_a = [fPath_jymx_list, fPath_zhxx_list, fPath_ryxx_list, fPath_qzcs_list, fPath_error_list]
        list_name_files_csv = ['xxxx', 'xxxx', 'xxxx', 'xxxx', 'error']
        for csv_i in range(len(concat_list_a)):
            print('---------【汇总到:%s文件%d个!!!】---------' % (list_name_files_csv[csv_i], len(concat_list_a[csv_i])))
        print("*********【汇总完成,已用时%.2f】*********" % (time_count_start_to_now()))
        # print(count_path)
        path_name_time()
        concat_dataframe_csv(concat_list_a)

如代码,获取到path_excl中数据内容后传给 p_path,然后对 p_path.path列遍历,

再利用:

for root, dirs, files in os.walk(f_path):
            for name in files:
            	if 'XXXX' in name and name[-4:] == '.csv':
            		fPath_XXXX_list.append(os.path.join(root, name))

将4中文件分类,并把路径存在list中

然后:

 concat_list_a = [fPath_jymx_list, fPath_zhxx_list, fPath_ryxx_list, fPath_qzcs_list, fPath_error_list]

将5个list 合并,一起return 出来(实际代码没有return,直接把别的函数放进去了)

额外热知识:
在该模块,还定义了一个自动生成文件夹的函数path_name_time(),代码如下:

def path_name_time():
    global path_nameTime
    timestr = datetime.datetime.now().strftime('%Y%m%d%H%M%S')  ###生成当下的时间
    path_nameTime = timestr
    os.mkdir('out/汇总数据/' + path_nameTime)
    os.mkdir('out/清洗数据/' + path_nameTime)
old_time = datetime.datetime.now()

利用datetime.datetime.now().strftime(’%Y%m%d%H%M%S’) 生成当前时间,再结合os.mkdir将时间拼接进路径名中,这样就避免不重复~

另外还设置了一个和计时器,代码如下:

def time_count_start_to_now():
    global old_time
    new_time = datetime.datetime.now()
    time_cross = (new_time - old_time).seconds / 60
    return time_cross

3.2 数据表合并

在数据表通过分类后,装有各类数据的list 传入concat_dataframe_csv 函数,由于我们只有4总文件,所以设定count_No<=3: 去限定它,代码如下:

def concat_dataframe_csv(concat_list):
    # path_name_time()
    print("*********【正在数据表合成模块】*********")
    count_No=0
    for no in range(len(concat_list)):
        if count_No<=3:
            if len(concat_list[no]) == 0:
                print("跳过路径:'%s'下无文件" % (concat_list[no]))
                continue
            else:
                print("正在处理路径:'%s'下文件" % (concat_list[no][0]))
            count = 0
            data_top = pd.DataFrame()
            # data_top = pd.read_csv(concat_list[no][0], engine='python',  dtype=str)  # ,encoding='gb18030' encoding='gbk',
            data_top["文件路径"] = concat_list[no][0]
            print("*********正在防科学计数法与concat合并*********")
            path_all_long = len(concat_list[no])
            for fPath in concat_list[no]:
                try:
                    count += 1
                    data = pd.read_csv(fPath, engine='python', dtype=str)# encoding='gbk',
                    data["文件路径"] = fPath
                    data_top = pd.concat([data_top, data], axis=0)
                    if count % 5 == 0:
                        rest_long = path_all_long - count
                        print("---------在节点%d已完成%d个文件合并,该节点还剩%d个文件待合并---------" % (no, count, rest_long))
                        print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
                except:
                    print('路径‘%s’不可读取/n'%(os.path.basename(fPath)))
                    concat_list[4].append(fPath)
                    pass

            print("---------在节点%d共完成%d个文件合并---------" % (no, count))
            old_long = len(data_top.index)
            print("---------【共汇总%d条数据】---------" % (old_long))
            drop_col_list=[]
            # print(data_top.columns)
            ##列名去空
            new_columns=[]
            for col_i in data_top.columns.tolist():
                col_i = col_i.strip()
                #     i=i.replace("\\t","")
                new_columns.append(col_i)
            data_top.columns=new_columns


            if count_No==0:
                data_top = data_top.drop_duplicates(drop_col_list, keep='first')
            else:
                data_top = data_top.drop_duplicates(data_top.columns.tolist(), keep='first')
            count_No+=1
            data_top.reset_index(drop=True,inplace=True)

            new_long = len(data_top.index)
            print("---------【共删除%d条重复数据】---------" % (old_long - new_long))
            print("---------准备进行防科学计数法模块---------")
            data_top = insert_t(data_top)
            print("---------防科学计数法已完成---------")
            name_list = ['jymx.csv', 'zhxx.csv', 'ryxx.csv', 'qzcs.csv', 'error.csv']

            data_top.to_csv('out\\汇总数据\\' + path_nameTime + '\\' + name_list[no], index=False)
            data_file_name = name_list[no]
            print('---------%s已导出,共完成%d个文件合并---------' % (data_file_name, count))
            print("*********【%s汇总完成】*********" % (name_list[no]))
            print("*********【合成模块完成】*********\n\n")
            print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
            drop_NaN(data_top, data_file_name)
        if count_No>3:
            data_top = pd.DataFrame({'错误路径':concat_list[4]})
            data_file_name='file_error.csv'
            drop_NaN(data_top, data_file_name)

            pass

代码思路:
在分类后的concat_list_a 列表值传给了concat_list,然后对concat_list 进行遍历,再对其中的多个路径遍历,再通过pd.read_csv(fPath, engine=‘python’, dtype=str) 将路径下文件读取,首先读取第一个保留,再用data_top = pd.concat([data_top, data], axis=0)自增,成功将数据合并~
合并完后data_top = data_top.drop_duplicates(drop_col_list, keep=‘first’) 去一下重
核心代码如下:

data_top["文件路径"] = concat_list[no][0]
            print("*********正在防科学计数法与concat合并*********")
            path_all_long = len(concat_list[no])
            for fPath in concat_list[no]:
                    count += 1
                    data = pd.read_csv(fPath, engine='python', dtype=str)# encoding='gbk',
                    data["文件路径"] = fPath
                    data_top = pd.concat([data_top, data], axis=0)

到这里,数据的合并就完成了,再针对每列数据,进行合理的字符处理,
运用np.where \str.contains\re \find\trip等,可以完成清洗~

完整代码如下:

#!/usr/bin/env python
# -- coding: utf-8 --
# @Time : 2020-6-9
# @Author : wang vx:672377334
# @File : v4.3.2深圳版本.py

import datetime
import os
import numpy as np
import pandas as pd

# import time
pd.set_option('float_format', lambda x: '%.0f' % x)


import warnings
warnings.filterwarnings("ignore")

# 全局
data_jymx = 1
data_zhxx = 2
data_count_open = 0
list_ignore_columns = ['交易卡号', '开户人证件号码', '交易账号', '交易对手账卡号', '证照号码', '单位电话', '账号', '请求单号',
                       '帐卡号', '子账户账号', '联系手机', '证账号码', '开户账号']
ddpc = 0
path_nameTime = ''


def path_name_time():
    global path_nameTime
    timestr = datetime.datetime.now().strftime('%Y%m%d%H%M%S')  ###生成当下的时间
    path_nameTime = timestr
    os.mkdir('out/汇总数据/' + path_nameTime)
    os.mkdir('out/清洗数据/' + path_nameTime)
old_time = datetime.datetime.now()


def time_count_start_to_now():
    global old_time
    new_time = datetime.datetime.now()
    time_cross = (new_time - old_time).seconds / 60
    return time_cross


def file_groupby_csvName(path_excl):
    global ddpc
    global path_nameTime
    print("*********【正在文件类型分类模块】*********")
    # 文件分类模块
    p_path = pd.read_excel(path_excl)
    p_path = p_path[p_path['isTrue'] == '是']
    count_path = -1
    for f_path in p_path.path:
        fPath_jymx_list = []
        fPath_zhxx_list = []
        fPath_ryxx_list = []
        fPath_qzcs_list = []
        fPath_error_list = []
        count_path += 1
        ddpc = p_path.NO.iloc[count_path]
        print("---------正在处理第%d批次数据---------" % (count_path))
        print("正在处理:'%s' 下文件" % (f_path))
        for root, dirs, files in os.walk(f_path):
            for name in files:
                if '交易明细' in name and name[-4:] == '.csv':
                    fPath_jymx_list.append(os.path.join(root, name))
                if '账户信息' in name and name[-4:] == '.csv':
                    fPath_zhxx_list.append(os.path.join(root, name))
                if '人员信息' in name and name[-4:] == '.csv':
                    fPath_ryxx_list.append(os.path.join(root, name))
                if '强制' in name and name[-4:] == '.csv':
                    fPath_qzcs_list.append(os.path.join(root, name))
                elif '交易明细' not in name and '账户信息' not in name and '人员信息' not in name and '强制' not in name and name[
                                                                                                               -4:] == '.csv':
                    fPath_error_list.append(os.path.join(root, name))
        print("*********' %s'分类完毕' *********" % (f_path))
        concat_list_a = [fPath_jymx_list, fPath_zhxx_list, fPath_ryxx_list, fPath_qzcs_list, fPath_error_list]
        list_name_files_csv = ['交易明细', '账户信息', '人员信息', '强制', 'error']
        for csv_i in range(len(concat_list_a)):
            print('---------【汇总到:%s文件%d个!!!】---------' % (list_name_files_csv[csv_i], len(concat_list_a[csv_i])))
        print("*********【汇总完成,已用时%.2f】*********" % (time_count_start_to_now()))
        # print(count_path)
        path_name_time()

        concat_dataframe_csv(concat_list_a)


def concat_dataframe_csv(concat_list):
    # path_name_time()
    print("*********【正在数据表合成模块】*********")
    count_No=0
    for no in range(len(concat_list)):
        if count_No<=3:
            if len(concat_list[no]) == 0:
                print("跳过路径:'%s'下无文件" % (concat_list[no]))
                continue
            else:
                print("正在处理路径:'%s'下文件" % (concat_list[no][0]))
            count = 0
            data_top = pd.DataFrame()
            # data_top = pd.read_csv(concat_list[no][0], engine='python',  dtype=str)  # ,encoding='gb18030' encoding='gbk',
            data_top["文件路径"] = concat_list[no][0]
            print("*********正在防科学计数法与concat合并*********")
            path_all_long = len(concat_list[no])
            for fPath in concat_list[no]:
                try:
                    count += 1
                    data = pd.read_csv(fPath, engine='python', dtype=str)# encoding='gbk',
                    data["文件路径"] = fPath
                    data_top = pd.concat([data_top, data], axis=0)
                    if count % 5 == 0:
                        rest_long = path_all_long - count
                        print("---------在节点%d已完成%d个文件合并,该节点还剩%d个文件待合并---------" % (no, count, rest_long))
                        print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
                except:
                    print('路径‘%s’不可读取/n'%(os.path.basename(fPath)))
                    concat_list[4].append(fPath)
                    pass

            print("---------在节点%d共完成%d个文件合并---------" % (no, count))
            old_long = len(data_top.index)
            print("---------【共汇总%d条数据】---------" % (old_long))
            drop_col_list=['交易卡号',
                           '交易账号',
                         #  '交易方户名',
                          # '交易方证件号码',
                           '交易时间',
                           '交易金额',
                           '交易余额',
                           '交易币种',
                           '收付标志',
                           '交易对手账卡号',
                           '对手户名',
                           '对手身份证号']
            # print(data_top.columns)
            ##列名去空
            new_columns=[]
            for col_i in data_top.columns.tolist():
                col_i = col_i.strip()
                #     i=i.replace("\\t","")
                new_columns.append(col_i)
            data_top.columns=new_columns


            if count_No==0:
                data_top = data_top.drop_duplicates(drop_col_list, keep='first')
            else:
                data_top = data_top.drop_duplicates(data_top.columns.tolist(), keep='first')
            count_No+=1
            data_top.reset_index(drop=True,inplace=True)

            new_long = len(data_top.index)
            print("---------【共删除%d条重复数据】---------" % (old_long - new_long))
            print("---------准备进行防科学计数法模块---------")
            data_top = insert_t(data_top)
            print("---------防科学计数法已完成---------")
            name_list = ['jymx.csv', 'zhxx.csv', 'ryxx.csv', 'qzcs.csv', 'error.csv']

            data_top.to_csv('out/汇总数据\\' + path_nameTime + '\\' + name_list[no], index=False)
            data_file_name = name_list[no]
            print('---------%s已导出,共完成%d个文件合并---------' % (data_file_name, count))
            print("*********【%s汇总完成】*********" % (name_list[no]))
            print("*********【合成模块完成】*********\n\n")
            print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
            drop_NaN(data_top, data_file_name)
        if count_No>3:
            data_top = pd.DataFrame({'错误路径':concat_list[4]})
            data_file_name='file_error.csv'
            drop_NaN(data_top, data_file_name)

            pass

def drop_NaN(data, to_csv_name):
    print("*********【正在进行去空模块】*********")
    global data_jymx
    global data_zhxx
    global data_count_open
    global list_ignore_columns
    global ddpc
    for i in data.columns:
        if i not in list_ignore_columns:
            data[i] = data[i].astype('str')
            data[i] = np.where(data[i].str.contains("\t"), data[i].str.replace("\t", ''), data[i])
            print("---------正在处理'%s'列换行/表符---------" % (i))
    print('---------正在删除重复值---------')
    old_long = len(data.index)
    data = data.drop_duplicates(data.columns.tolist(), keep='first')
    new_long = len(data.index)
    print('---------共删除了 %d 条重复值,保留%d条合格数据---------' % (old_long - new_long, new_long))
    if to_csv_name != 'jymx.csv' and to_csv_name != 'zhxx.csv':
        data.to_csv('out/清洗数据/' + path_nameTime + '\\' + to_csv_name, index=False)
    elif to_csv_name == 'jymx.csv':
        data = change_jdbz(data)
        data_jymx = data
        data_count_open += 1
        data.to_csv('out/清洗数据/' + path_nameTime + '\\' + to_csv_name, index=False)
    elif to_csv_name == 'zhxx.csv':
        data_zhxx = data
        data_count_open += 1
        data.to_csv('out/清洗数据/' + path_nameTime + '\\' + to_csv_name, index=False)
    if data_count_open == 2:
        print('*********去空去重完成*********\n\n')
        print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
        bq_data(data_count_open)
        data_count_open = 0
        # ddpc=ddpc+1

    print('*********去空去重完成*********\n\n')
    print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))


# def drop_T(data):

# return data


def chang_columns(data):
    global ddpc
    print('*********【正在替换表头与调换表头顺序与去空处理排序】*********\n\n')
    columns_data = pd.read_excel("in/银行流水表头字典.xlsx")
    columns_dict = dict(zip(columns_data.中文, columns_data.拼音))
    # data = pd.read_csv('out/清洗数据/jymx_clean_BQ.csv')
    data.rename(columns=columns_dict, inplace=True)
    data['shard_id'] = list(data.index)
    data['ddpc'] = ddpc  # 输入调单批次
    print('---------正在调整列名顺序!!!!---------')
    #     data.to_csv('out/清洗数据/data_aaa.csv',index=False)

    columns_list_name = [i for i in columns_dict.values()]
    data = data[columns_list_name]

    for i in data.columns:
        print("---------正在处理%s列---------" % (i))
        data[i] = data[i].astype('str')
        data[i] = np.where(data[i].str.contains("\t"), data[i].str.replace("\t", ''), data[i])

    # data.to_csv('out/清洗数据/jymx_drop_t.csv', index=False)
    print('*********【替换列名与去空完成】*********')
    return data


def insert_t(data_name):
    global list_ignore_columns
    col_clean = []
    data = data_name
    for i in data.columns:  # 列名去空
        col_clean.append(i.strip())
    data.columns = col_clean
    for i in data.columns:
        data[i] = data[i].str.strip()

    for i in data.columns:
        # if i=='交易卡号' or i=='开户人证件号码' or i=='交易账号' or i=='交易对手账卡号':#[卡号加\t防科学计数法]
        if i in list_ignore_columns:
            data[i] = data[i].astype("str")
            data[i] = np.where(data[i].str.contains("\t"), data[i].str.replace("\t", '\t'),
                               data[i].apply(lambda x: str(x) + '\t'))
            data[i] = np.where(data[i].str.contains('_'), data[i].apply(lambda x: x[:x.find("_")] + '\t'), data[i])
            data[i] = np.where(data[i].str.contains('-'), data[i].apply(lambda x: x[:x.find("-")] + '\t'), data[i])
            data[i] = np.where(data[i].isnull(), '\t', data[i])

        ## 补全空值
        if i == '对手户名' and '摘要说明' in data.columns:  # 交易对手账卡号 提取
            data[i] = data[i].astype(str)
            data['摘要说明'] = data['摘要说明'].astype(str)

            #             # 仅提取为空 账卡号
            #             data["交易对手账号"] = np.where(data['摘要说明'].str.contains("卡号") & data[i] == '\t',
            #                                data.摘要说明.apply(lambda x: x[x.rfind(":") + 1:] + '\t'), data[i])
            #             data["交易对手账号"] = np.where(data['摘要说明'].str.contains("卡号") & data[i] == '',
            #                                data.摘要说明.apply(lambda x: x[x.rfind(":") + 1:] + '\t'), data[i])
            #             data["交易对手账号"] = np.where(data['摘要说明'].str.contains("卡号") & data[i] == 'nan',
            #                                data.摘要说明.apply(lambda x: x[x.rfind(":") + 1:] + '\t'), data[i])

            # 全部从摘要提取 卡号
            data[i] = np.where(data['摘要说明'].str.contains("卡号")&data[i] =='	',
                               data.摘要说明.apply(lambda x: x[x.rfind(":") + 1:] + '\t'), data[i])
            #         if i == '交易对手账卡号' and '摘要说明' in data.columns:
            data[i] = data[i].astype(str)
            data['摘要说明'] = data['摘要说明'].astype(str)
            data[i] = np.where(data[i] == '\t', data.摘要说明 + "\t", data[i])
            data[i] = np.where(data[i] == '', data.摘要说明, data[i])
            data[i] = np.where(data[i] == 'nan', data.摘要说明, data[i])
            data[i] = np.where(data[i].isnull(), data.摘要说明, data[i])
    #         if i == '对手户名'and  '交易类型' in data.columns:
    #             data[i] = data[i].astype(str)
    #             data['交易类型'] = data['交易类型'].astype(str)
    #             data[i] = np.where(data[i] == '\t', data['交易类型'] + "\t", data[i])
    #             data[i] = np.where(data[i] == '', data['交易类型'] + "\t", data[i])
    #             data[i] =np.where(data[i].isnull(), data['交易类型'], data[i])
    #             # np.where(data[i] == 'nan', data['交易类型'], data[i])
    # np.where(data[i].isnull(), data['交易类型'], data[i])
    return data


def change_jdbz(data):
    # jymx输出/清洗数据
    print("---------【正在去除交易金额空值/nan数据】---------")
    old_long = len(data.index)
    print("---------1正在去除空值/nan数据---------")
    data = data[data.交易金额 != ""]
    #     data.drop(data[np.isnan(data['交易金额'])].index, inplace=True)  # 删除NAN数据
    data.交易金额 = data.交易金额.astype(str)
    data[data['交易金额'] != 'nan']
    data.交易金额 = data.交易金额.astype(float)
    data = data[data.交易金额 != 0]
    print("---------2正在拨正借贷标志数据---------")
    #     data.loc[data.交易金额 < 0, '收付标志'] = "进"
    data['交易金额'] = np.where(data['交易金额'] < 0, -data['交易金额'], data['交易金额'])
    # print("---------正在导出数据---------")
    # # data.to_csv("out/清洗数据/jymx_clean.csv", index=False)
    # print("---------5导出数据成功---------")
    new_long = len(data.index)
    print('---------【共删除了 %d 条无效值,保留%d条合格数据】---------' % (old_long - new_long, new_long))
    print('*********去空去重完成*********\n\n')
    print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
    return data


def cxkh_False(data_zhxx_1, data_zhxx_2, data_jymx_clean_False):
    if len(data_jymx_clean_False.index) == 0:
        print('---------【没有需要以账号为主键的填充数据】---------')
        return data_jymx_clean_False
    elif len(data_jymx_clean_False.index) > 0:
        print('---------【正在以账号为主键的填充数据】---------')
        data_zhxx_1 = data_zhxx_1[data_zhxx_1.交易账号.isin(list(data_jymx_clean_False.交易账号.unique()))]
        dict_name = dict(zip(data_zhxx_1.交易账号, data_zhxx_1.账户开户名称))
        print("---------卡号-开户名称 字典生成 成功---------")
        dict_zjhm = dict(zip(data_zhxx_1.交易账号, data_zhxx_1.开户人证件号码))
        print("---------卡号-开户人证件号 字典生成 成功---------")
        print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
        # data_jymx_clean_False.交易卡号.apply(lambda x:str(x)+'\t')
        data_jymx_clean_False['交易方户名'] = data_jymx_clean_False['交易账号']
        data_jymx_clean_False['交易方证件号码'] = data_jymx_clean_False['交易账号']
        print("---------正在补全 开户名称---------")
        # data_jymx_clean_False.replace({'交易方户名': dict_name}, inplace=True)
        data_jymx_clean_False.交易方户名 = data_jymx_clean_False.交易方户名.replace(dict_name)
        print("---------补全 开户名称 成功---------")
        print("---------正在补全 交易方证件号码---------")
        # data_jymx_clean_False.replace({'交易方证件号码': dict_zjhm}, inplace=True)
        data_jymx_clean_False.交易方证件号码 = data_jymx_clean_False.交易方证件号码.replace(dict_zjhm)
        print("---------补全 交易方证件号码 成功---------")

        print("---------正在插入交易卡号余额/交易对手账卡号余额 列名---------")

        cxkhye_list = list(data_jymx_clean_False.交易账号.isin(list(data_zhxx_1.交易账号.unique())))
        # jydfzkhye_list = list(data_jymx_clean_False.交易对手账卡号.isin(list(data_zhxx_1.交易账号.unique())))
        data_jymx_clean_False['交易卡号余额'] = cxkhye_list
        # data_jymx_clean_False['交易对手账卡号余额'] = jydfzkhye_list

        print("---------正在搬运 账号到 交易卡号余额/交易对手账卡号余额 列名---------")
        print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
        data_jymx_clean_False.交易卡号余额 = np.where(data_jymx_clean_False.交易卡号余额 == True, data_jymx_clean_False.交易账号, '')
        # data_jymx_clean_False.交易对手账卡号余额 = np.where(data_jymx_clean_False.交易对手账卡号余额 == True,
        #                                            data_jymx_clean_False.交易对手账卡号, '')

        #         子账户账号 余额

        data_zhxx_2 = data_zhxx_2[data_zhxx_2.子账户账号.isin(list(data_jymx_clean_False.交易账号.unique()))]
        dict_ye = dict(zip(data_zhxx_2.子账户账号, data_zhxx_2.账户余额))

        print("---------正在插入交易卡号余额列---------")
        # data_jymx_clean_False.replace({'交易卡号余额': dict_ye}, inplace=True)
        data_jymx_clean_False.交易卡号余额 = data_jymx_clean_False.交易卡号余额.replace(dict_ye)
        print("---------正在插入交易对手账号余额列---------")
        # data_jymx_clean_False.replace({'交易对手账卡号余额': dict_ye}, inplace=True)
        # data_jymx_clean_False.交易对手账卡号余额 = data_jymx_clean_False.交易对手账卡号余额.replace(dict_ye)
        # data_jymx_clean_False.交易对手账卡号余额 = np.where(data_jymx_clean_False.交易对手账卡号余额.isnull(), '',data_jymx_clean_False.交易对手账卡号余额)
        data_jymx_clean_False.交易卡号余额 = np.where(data_jymx_clean_False.交易卡号余额.isnull(), '', data_jymx_clean_False.交易卡号余额)
        # data_jymx_clean_False.replace({'交易对手账号余额': dict_ye}, inplace=True)

        data_jymx_clean_False['交易卡号状态'] = cxkhye_list
        # data_jymx_clean_False['交易对手账卡号状态'] = jydfzkhye_list
        print("---------正在搬运 账号到 交易卡号状态/交易对手账卡号状态 列名---------")
        print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
        data_jymx_clean_False.交易卡号状态 = np.where(data_jymx_clean_False.交易卡号状态 == True, data_jymx_clean_False.交易账号, '')
        # data_jymx_clean_False.交易对手账卡号状态 = np.where(data_jymx_clean_False.交易对手账卡号状态 == True,data_jymx_clean_False.交易对手账卡号, '')

        dict_zt = dict(zip(data_zhxx_2.子账户账号, data_zhxx_2.账户状态))
        #           dict_ye = dict(zip(zhxx_false_use.子账户账号, data_zhxx_1.账户余额))
        print("---------正在插入交易卡号状态列---------")
        # data_jymx_clean_False.replace({'交易卡号状态': dict_zt}, inplace=True)
        data_jymx_clean_False.交易卡号状态 = data_jymx_clean_False.交易卡号状态.replace(dict_zt)
        print("---------正在插入交易对手账卡号状态列---------")
        # data_jymx_clean_False.replace({'交易对手账卡号状态': dict_zt}, inplace=True)
        # data_jymx_clean_False.交易对手账卡号状态 = data_jymx_clean_False.交易对手账卡号状态.replace(dict_zt)
        data_jymx_clean_False.交易卡号状态 = np.where(data_jymx_clean_False.交易卡号状态.isnull(), '', data_jymx_clean_False.交易卡号状态)
        # data_jymx_clean_False.交易对手账卡号状态 = np.where(data_jymx_clean_False.交易对手账卡号状态.isnull(), '',
        # data_jymx_clean_False.交易对手账卡号状态)

        return data_jymx_clean_False


def bq_data(data_count_open):
    global data_zhxx
    global data_jymx
    if data_count_open == 2:
        print("*********正在准备补全交易明细数据*********")
    # data_jymx = pd.read_csv("out/清洗数据/"+path_nameTime+"/jymx.csv",encoding="utf8",dtype=str)
    # data_jymx = insert_t(data_jymx)
    # data_zhxx = pd.read_csv("out/清洗数据/"+path_nameTime+"/zhxx.csv",encoding="utf8",dtype=str)
    print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
    # for i in data_jymx.columns:
    #     data_jymx[i]=data_jymx[i].astype(str)
    # for i in data_jymx.columns:
    # data_zhxx = data_zhxx[~data_zhxx.账户开户名称.isnull()]

    data_zhxx_2 = data_zhxx

    data_zhxx = data_zhxx[~data_zhxx.账户开户名称.isnull()]
    data_zhxx = data_zhxx[data_zhxx.账户开户名称 != '']
    data_zhxx = data_zhxx[data_zhxx.账户开户名称 != 'nan']

    data_zhxx_1 = data_zhxx
    data_jymx_clean = data_jymx

    print("---------正在准备补全空交易卡号---------")
    data_jykh_nan = data_jymx_clean[(data_jymx_clean.交易卡号 == '\t') | (data_jymx_clean.交易卡号 == '')]
    data_jymx_clean = data_jymx_clean[(data_jymx_clean.交易卡号 != '\t') & (data_jymx_clean.交易卡号 != '')]
    data_jykh_nan.交易卡号 = data_jykh_nan.交易账号
    dict_jykh = dict(zip(data_zhxx_1.交易账号, data_zhxx_1.交易卡号))
    # data_jykh_nan.replace({'交易卡号': dict_jykh}, inplace=True)

    data_jykh_nan.交易卡号 = data_jykh_nan.交易卡号.replace(dict_jykh)
    data_jymx_clean = pd.concat([data_jymx_clean, data_jykh_nan], axis=0)
    print("---------补全空交易卡号成功---------")

    print("---------正在准备补全空交易账号---------")
    data_jymx_clean.交易账号 = data_jymx_clean.交易账号.astype(str)
    data_jyzh_nan = data_jymx_clean[(data_jymx_clean.交易账号 == '\t') | (data_jymx_clean.交易账号 == '')]
    data_jymx_clean = data_jymx_clean[(data_jymx_clean.交易账号 != '\t') & (data_jymx_clean.交易账号 != '')]
    # # 赋值交易卡号 给 交易账号
    data_jyzh_nan.交易账号 = data_jyzh_nan.交易卡号
    dict_jyzh = dict(zip(data_zhxx_1.交易卡号, data_zhxx_1.交易账号))
    # data_jyzh_nan.replace({'交易账号': dict_jyzh}, inplace=True)
    data_jyzh_nan.交易账号 = data_jyzh_nan.交易账号.replace(dict_jyzh)
    data_jymx_clean = pd.concat([data_jymx_clean, data_jyzh_nan], axis=0)
    print("---------补全空交易账号成功---------")
    print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))

    data_zhxx_1 = data_zhxx_1[data_zhxx_1.交易卡号.isin(list(data_jymx_clean.交易卡号.unique()))]
    dict_name = dict(zip(data_zhxx_1.交易卡号, data_zhxx_1.账户开户名称))
    print("---------卡号-开户名称 字典生成 成功---------")
    dict_zjhm = dict(zip(data_zhxx_1.交易卡号, data_zhxx_1.开户人证件号码))

    print("---------卡号-开户人证件号 字典生成 成功---------")
    print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
    # data_jymx_clean.交易卡号.apply(lambda x:str(x)+'\t')
    data_jymx_clean['交易方户名'] = data_jymx_clean['交易卡号']
    data_jymx_clean['交易方证件号码'] = data_jymx_clean['交易卡号']
    print("---------正在补全 开户名称---------")
    # data_jymx_clean.replace({'交易方户名': dict_name}, inplace=True)
    data_jymx_clean.交易方户名 = data_jymx_clean.交易方户名.replace(dict_name)
    print("---------补全 开户名称 成功---------")
    print("---------正在补全 交易方证件号码---------")
    # data_jymx_clean.replace({'交易方证件号码: dict_zjhm}, inplace=True)

    data_jymx_clean.交易方证件号码 = data_jymx_clean.交易方证件号码.replace(dict_zjhm)
    print("---------补全 交易方证件号码成功---------")

    print("---------正在插入交易卡号余额/交易对手账卡号余额 列名---------")
    cxkhye_list = list(data_jymx_clean.交易卡号.isin(list(data_zhxx_1.交易卡号.unique())))
    jydfzkhye_list = list(data_jymx_clean.交易对手账卡号.isin(list(data_zhxx_1.交易卡号.unique())))
    data_jymx_clean['交易卡号余额'] = cxkhye_list
    data_jymx_clean['交易对手账卡号余额'] = jydfzkhye_list

    print("---------正在搬运 卡号到 交易卡号余额/交易对手账卡号余额 列名---------")
    print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
    data_jymx_clean.交易卡号余额 = np.where(data_jymx_clean.交易卡号余额 == True, data_jymx_clean.交易卡号, '')
    data_jymx_clean.交易对手账卡号余额 = np.where(data_jymx_clean.交易对手账卡号余额 == True, data_jymx_clean.交易对手账卡号, '')

    dict_ye = dict(zip(data_zhxx_1.交易卡号, data_zhxx_1.账户余额))
    print("---------正在插入交易卡号余额列---------")
    # data_jymx_clean.replace({'交易卡号余额': dict_ye}, inplace=True)
    data_jymx_clean.交易卡号余额 = data_jymx_clean.交易卡号余额.replace(dict_ye)
    print("---------正在插入交易对手账号余额列---------")
    # data_jymx_clean.replace({'交易对手账卡号余额': dict_ye}, inplace=True)
    data_jymx_clean.交易对手账卡号余额 = data_jymx_clean.交易对手账卡号余额.replace(dict_ye)
    data_jymx_clean.交易卡号余额 = np.where(data_jymx_clean.交易卡号余额.isnull(), '', data_jymx_clean.交易卡号余额)
    data_jymx_clean.交易对手账卡号余额 = np.where(data_jymx_clean.交易对手账卡号余额.isnull(), '', data_jymx_clean.交易对手账卡号余额)

    data_jymx_clean['交易卡号状态'] = cxkhye_list
    data_jymx_clean['交易对手账卡号状态'] = jydfzkhye_list
    print("---------正在搬运 账号到 交易卡号状态/交易对手账卡号状态 列名---------")
    print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))
    data_jymx_clean.交易卡号状态 = np.where(data_jymx_clean.交易卡号状态 == True, data_jymx_clean.交易卡号, '')
    data_jymx_clean.交易对手账卡号状态 = np.where(data_jymx_clean.交易对手账卡号状态 == True, data_jymx_clean.交易对手账卡号, '')

    dict_zt = dict(zip(data_zhxx_1.交易卡号, data_zhxx_1.账户状态))
    print("---------正在插入交易卡号状态列---------")
    # data_jymx_clean.replace({'交易卡号状态': dict_zt}, inplace=True)
    data_jymx_clean.交易卡号状态 = data_jymx_clean.交易卡号状态.replace(dict_zt)
    print("---------正在插入交易对手账卡号状态列---------")
    data_jymx_clean.交易对手账卡号状态 = data_jymx_clean.交易对手账卡号状态.replace(dict_zt)
    data_jymx_clean.交易卡号状态 = np.where(data_jymx_clean.交易卡号状态.isnull(), '', data_jymx_clean.交易卡号状态)
    data_jymx_clean.交易对手账卡号状态 = np.where(data_jymx_clean.交易对手账卡号状态.isnull(), '', data_jymx_clean.交易对手账卡号状态)
    print("---------插入交易对手账卡号状态列 完成---------\n")
    clean_to_csv(data_jymx_clean, data_zhxx_1, data_zhxx_2)


def clean_to_csv(data_jymx_clean, data_zhxx_1, data_zhxx_2):
    # 抽取
    data_jymx_clean_False = data_jymx_clean[data_jymx_clean.交易卡号 == data_jymx_clean.交易方户名]
    data_jymx_clean_True = data_jymx_clean[data_jymx_clean.交易卡号 != data_jymx_clean.交易方户名]

    # 调用以账号为主键填充  函数
    print("---------调用以账号为主键填充函数---------")
    data_jymx_clean_False = cxkh_False(data_zhxx_1, data_zhxx_2, data_jymx_clean_False)
    data_jymx_clean = pd.concat([data_jymx_clean_True, data_jymx_clean_False], axis=0)
    data_jymx_null = data_jymx_clean[(data_jymx_clean.交易卡号 == '\t') | (data_jymx_clean.交易卡号 == '')]
    data_jymx_clean = data_jymx_clean[(data_jymx_clean.交易卡号 != '\t') & (data_jymx_clean.交易卡号 != '')]
    old_long = len(data_jymx_clean.index)
    data_jymx_clean_chang_columns = chang_columns(data_jymx_clean)
    print("---------do【 导出输出/清洗数据/jymx_clean_BQ.csv】 ing---------")
    # 去重
    col_keys = ['cxkh', 'cxzh', 'jymc', 'jyzjhm', 'jysj', 'jyje', 'jyye', 'jybz', 'jdbz', 'jydfzkh', 'jydfmc']
    data_jymx_clean_chang_columns = data_jymx_clean_chang_columns.drop_duplicates(col_keys, keep='first')
    data_jymx_clean_chang_columns.to_csv("out/清洗数据/" + path_nameTime + '\\' + "jymx_clean_BQ.csv", index=False)

    print('---------do【 导出输出/清洗数据/jymx_clean_BQ_khd.csv】 ing---------')
    columns_data_2 = pd.read_excel("in/银行流水表头字典.xlsx")
    #   导出客户端格式
    name_dict_2 = dict(zip(columns_data_2.拼音, columns_data_2.中文))
    data_jymx_clean_chang_columns.rename(columns=name_dict_2, inplace=True)
    data_jymx_clean_chang_columns.to_csv("out/清洗数据/" + path_nameTime + '\\' + "jymx_clean_BQ_khd.csv", index=False)
    print('---------客户端数据导出成功---------')
    # data_jymx_clean_chang_columns.to_excel("out/清洗数据/" + path_nameTime + '\\' + "jymx_clean_BQ.xlsx", index=False)
    data_jymx_null.to_csv("out/清洗数据/" + path_nameTime + '\\' + "jymx_clean_BQ_NULL.csv", index=False)
    new_long = len(data_jymx_clean_chang_columns.index)

    print('*********【补全交易明细数据完成,表内数据共:%d条,去重:%d】*********\n\n' % (new_long, old_long - new_long))

    print("*********【已用时:%.2f分钟】*********" % (time_count_start_to_now()))


if __name__ == '__main__':
    path_f = ("in\点我输入文件路径.xlsx")
    print("*********Go ! 又是开心的一天呢!*********\n")

    try:
        file_groupby_csvName(path_f)
        #         success = input("********恭喜!!清洗完成!! 输入任意键结束********")
        print("********恭喜!!清洗完成!! 输入任意键结束********")

        # img=cv2.imread("in\.img\img.jpg")
        # cv2.imshow("码代码不易,且用且珍惜",img)
        # cv2.waitKey=(0)
    except Exception as e:
        print(e)
        error = input("********报错!!微信联系:672377334 获取失败原因 输入任意键结束********")
        pass




Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐