Py学习  »  Python

【Python私活案例】500元,提供exe实现批量excel文件的存入mysql数据库

蚂蚁学Python • 1 年前 • 201 次点击  

下午的时候我正无聊的刷着手机,就听叮咚一声,我就顺便看了一眼,好家伙是老师在发赚钱的单子,我再一看,这不是我刚刚学过去的知识吗,二话不说立马就开启了‘抢单’模式。感谢老师让我得到了批量将excel文件存入mysql数据库的单子,本来以为很简单的单子,但是遇到几个我忽略的问题,让我着实头疼了一番,看来还是要多学习才行。

【业务需求】

打开exe后,弹出一个exe命令行窗口,输入路径,执行递归扫描很多个excel文件,存入mysql数据库

【代码实现分析】

需求分析:

  1. 需要批量读取excel;
  2. 需要存入mysql;
  3. 需要将py文件打包为exe

看起来就是如此简单 不过经过进一步沟通才知道:

  1. 是有很多excel文件存在不同级别的文件夹里,每个excel里面又有很多的表数据,幸好表的格式基本相同。
  2. 批量读取excel表内容,并简单处理用pandas更加的方便一点,果断选择pandas,不过to_sql命令我比较陌生,又去学习了一番;
  3. 打包工具,也比较简单pyinstaller,网上教程一大堆,没啥可说的。

【代码实现】

首先我想到的是编一个函数,来找到目录内所有的excel相关文件的位置,这里我用的是pathlib2的Path下的rglob函数,直接可以选出目录内包含子文件夹下的所有符合条件的文件(这里要感谢船长的提醒,让我少走了好多的弯路,不然我铁定要用循环遍历的。。。。

#得到目录里面所有的excel文件和csv文件
def get_path():
    while True:
        path = input("请输入需要查找的目录:")
        if Path(path).exists():
            break
        else:
            print('您输入的目录不存在,请检查!!!!')
    print('正在查找中。。。。')
    return Path(path).rglob('**/*.xls*'), Path(path).rglob('**/*.csv')

其次就是根据得到的文件路径用pandas来读取,由于一个excel文件有很多表,所以我是这么写的,你发现什么问题了吗?

def readAllFiles():
    excel_file_list,csv_file_list = get_path()
    print('查找完成,数据整理中.....')
    for file_e in excel_file_list:
        df = pd.read_excel(file_e, sheet_name=None)
        for sheet_name in df.keys():
            df_1 = pd.read_excel(file_e,sheet_name=sheet_name,nrows=1)
            df_2 = pd.read_excel(file_e,sheet_name=sheet_name,header=2)
            wash_data(df_1,df_2)


为了解析excel中的数据,我用了2个pd.read_excel()来实现各自的目的,实现以后程序运行竟然很慢很慢,想了很多方法———多线程,更改处理数据方式都没有让程序快起来,为什么这么慢呢?WHY?

在我百思不得要领的时候突然看到了pandas读取,脑中灵光一现,原来就是这么简单。你想到了吗?对的,就是pandas读取数据非常慢,而我竟然让它读了3遍——罪过罪过。然后我就改成了这样:

    excel_file_list,csv_file_list = get_path()
print('查找完成,数据整理中.....')
for file_e in excel_file_list:
try:
df = pd.read_excel(file_e, sheet_name=None)
for sheet_name in df.keys():
df_1 = df[sheet_name].loc[0:0,:]
df_2 = df[sheet_name].iloc[2:,:-1]
df_e = wash_data(df_1,df_2)

当改成用pandas只读取一次后,程序飞了起来,我也飞了起来——哈哈哈哈哈哈哈哈——此处允许我疯一下!

剩下的数据处理,添加列,对列排队,存入数据库等等都是小意思。直接看代码吧!

#获取物料编码和物料描述
def get_wlbm_wlms(s_list):
    wlbm = s_list[0].split(':')[-1].strip()
    wlms = s_list[1] if '物料描述' not in s_list[1] else s_list[1].replace('(物料描述)','')
    return wlbm,wlms
#数据清洗和排列
def wash_data(df_1,df_2):
    df_1.dropna(axis=1, how='all', inplace=True)
    list_s = df_1.loc[0].values
    wlbm, wlms = get_wlbm_wlms(list_s)
    if not df_2.empty:
        df_2.columns = ['序号''条码''产品批次''软件版本''硬件版本''订单''出货日期''出货地点''备注']
        # 删除没有用的列
        df_2.dropna(axis=0,how='all',inplace=True)
        df_2.drop(columns=['产品批次''订单'], inplace=True)
        df_2 = df_2.replace('/',np.NaN)
        df_2['物料编码'] = wlbm
        df_2['产品名称'] = wlms
        df = df_2[['序号''条码''出货日期''产品名称''出货地点''物料编码''软件版本''硬件版本''备注']]
    else:
        data = [[np.NaN, np.NaN, np.NaN, wlms, np.NaN, wlbm, np.NaN, np.NaN, np.NaN]]
        df = pd.DataFrame(data,columns=['序号''条码''出货日期''产品名称''出货地点''物料编码''软件版本''硬件版本''备注'])
    return df
def get_sheet_data(sheet_name,df):
        df_1 = df[sheet_name].loc[0:0,:]
        df_2 = df[sheet_name].iloc[2:,:-1]
        df_e = wash_data(df_1,df_2)
        # print(df_e)
        df_e.to_sql(sql_info['TABLE_NAME'], chunksize=10000,con=engine, if_exists='append', index=False,dtype=DATA_TYPE)

当然这里有一个细节被我忽略了,在调试的时候才发现,就是warning,看图:就是这里,记得一定要用copy()一下,不然你就会看到warning,想看的可以试试!!

虽然我感觉数据清洗和处理是比较简单的,但是实际上也花了我一些的时间,由于pandas才刚刚开始学,有些东西真的是边学边写,幸好老师有很多东西都已经给出了例子,照着来一遍就可以实现效果。这个要大大的感谢一下老师,老师的视频做的实在是太详细了!!

我虽然在我的电脑上数据库用的没有任何问题,但是到了客户那边就出了各种问题,说实话我真的对数据库了解的不多,只能是有问题搜一下,根据自己的理解在自己的电脑上试一下。感慨一下,数据库真的是一个细心的功夫活!!总的来说还是解决了~~

最后就是增加了一些记录,防错,防重复的一些小功能,至少要让客户用起来舒服,客户可是上帝!!

另外多说一下,存到数据库时,一定要一一对应,类型格式也不能错,不然就是存不进去,让我白白浪费了一天时间才找到问题。感谢大家的阅读!

最后附上全部的代码:

import os
import numpy as np
import pandas as pd
from pathlib2 import Path
import pymysql
from sqlalchemy import create_engine
from sqlalchemy.types import DATE,INT,VARCHAR

DATA_TYPE = {'序号':INT,'条码':VARCHAR(255),'出货日期':DATE,'产品名称':VARCHAR(255), '出货地点':VARCHAR(255), '物料编码':VARCHAR(255), '软件版本':VARCHAR(255), '硬件版本':VARCHAR(255), '备注':VARCHAR(255)}
pymysql.install_as_MySQLdb()
#读取配置文件
def get_sql_info():
    sql_dict = {}
    with open('mysql_info.txt','r')as f:
        f_r_l = f.readlines()
        for f_r in f_r_l:
            sql_dict[f_r.split(':')[0].strip()] = f_r.split(':')[1].strip()
    return sql_dict

sql_info = get_sql_info()
DB_STRING = f"mysql+mysqldb://{sql_info['USER']}:{sql_info['PASSWORD']}@{sql_info['HOST']}/{sql_info['db_name']}?charset=utf8"
engine = create_engine(DB_STRING)

def clean_txt(path):
    with open(path, 'w', encoding='utf-8') as f:
        f.truncate()

#读取已经完成的sheet
def read_txt(path):
    if not os.path.exists(path):
        clean_txt(path)
        return
     with open(path,'r',encoding='utf-8')as f:
        sheetnames = f.readlines()
    return [i.strip() for i in sheetnames]

def write_txt(path,s):
    with open(path,'a',encoding='utf-8')as f:
        f.write(s+'\n')
#得到目录里面所有的excel文件和csv文件
def get_path():
    while True:
        path = input("请输入需要查找的目录:")
        print(path)
        if Path(path).exists():
            break
        else:
            print('您输入的目录不存在,请检查!!!!')
    print('正在查找中。。。。')
    return Path(path).rglob('**/*.xls*'), Path(path).rglob('**/*.csv')
#获取物料编码和物料描述
def get_wlbm_wlms(s_list):
    wlbm = s_list[0].split(':')[-1].strip()
    wlms = s_list[1] if '物料描述' not in s_list[1] else s_list[1].replace('(物料描述)','')
    return wlbm,wlms
#数据清洗和排列
def wash_data(df_a,df_b):
    df_1 = df_a.copy()
    df_2 = df_b.copy()
    df_1.dropna(axis=1, how='all', inplace=True)
    list_s = df_1.loc[0].values
    wlbm, wlms = get_wlbm_wlms(list_s)
    # print(wlbm,wlms)
    if not df_2.empty:
        df_2.columns = ['序号''条码''产品批次''软件版本' '硬件版本''订单''出货日期''出货地点''备注']
        # 删除没有用的列
        df_2.dropna(axis=0,how='all',inplace=True)
        df_2.drop(columns=['产品批次''订单'], inplace=True)
        df_2 = df_2.replace('/',np.NaN)
        df_2['物料编码'] = wlbm
        df_2['产品名称'] = wlms
        df = df_2[['序号''条码''出货日期''产品名称''出货地点''物料编码''软件版本''硬件版本''备注']]
    else:
        data = [[np.NaN, np.NaN, np.NaN, wlms, np.NaN, wlbm, np.NaN, np.NaN, np.NaN]]
        df = pd.DataFrame(data,columns=['序号''条码''出货日期''产品名称''出货地点''物料编码''软件版本''硬件版本''备注'])
    return df
def get_sheet_data(sheet_name,df):
        try:
            df_1 = df[sheet_name].loc[0:0,:]
            df_2 = df[sheet_name].iloc[2:,:-1]
            df_e = wash_data(df_1,df_2)
            # print(df_e)
            df_e.to_sql(sql_info['TABLE_NAME'], chunksize=10000,con=engine, if_exists='append', index=False,dtype=DATA_TYPE)

        except Exception as e:
            print(e)

def readAllFiles():
    excel_file_list,csv_file_list = get_path()
    print('查找完成,数据整理中.....')
    excel_names = read_txt(path_excel)
    i = 0
    for file_e in excel_file_list:
        if file_e in excel_names:
            continue
        try:
            df = pd.read_excel(file_e, sheet_name=None)
            # print(sheets)
            sheetnames = read_txt(path_sheet)
            j = 1
            for sheet_name in df.keys():
                if sheet_name in sheetnames:
                    continue
                get_sheet_data(sheet_name,df)
                write_txt(path_sheet,sheet_name)
                print(f'当前完成度{j}/{len(df.keys())}....')
                j += 1
            write_txt(path_excel,str(file_e))
            clean_txt(path_sheet)
            i += 1
            print(f'已完成{i}个文件。。。。')
        except Exception as e:
            print(e)

    for file_c in csv_file_list:
        try:
            df_c = pd.read_csv(file_c,encoding='gbk')
            df_1 = df_c.loc[0:0, :]
            df_2 = df_c.iloc[2:, :-1]
            df_c = wash_data(df_1, df_2)
            df_c.to_sql(sql_info['TABLE_NAME'], chunksize=10000, con=engine, if_exists='append', index=False,dtype=DATA_TYPE)
            write_txt(path_excel, str(file_c))
            i += 1
            print(f'已完成{i}个文件。。。。')
        except Exception as e:
            print(e)
if __name__ == "__main__":
    path_excel = 'excel_log.txt'
    path_sheet = 'sheetlog.txt'
    while True:
        readAllFiles()
        print('该目录已完成!')
        clean_txt(path_excel)
        input('继续就回车,不需要请直接关闭掉!')


蚂蚁老师的全栈套餐,在抖音扫码购买;有答疑服务、副业介绍等福利

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/132468
 
201 次点击