Python处理Excel文件并与数据库匹配做拼接
需求:Python处理Excel中数据并于数据库交互匹配得到账号信息等其他操作
Python实现
import os
import pandas as pd
import pymssql
import warnings
import time
def extract_broadband_speed(speed):if pd.notnull(speed) and 'M' in str(speed):return str(speed).split('M')[0] + 'M'else:return ''
def concatenate_with_dash(row):product_type = row.get('产品类型')workorder_type = row.get('工单类型')access_type = row.get('方式')broadband_speed = row.get('速率提取')if workorder_type in ['改', '其他']:if product_type == '宽带':return f"{product_type}-{broadband_speed}-{access_type}-{workorder_type}"else:return f"{product_type}-{workorder_type}"elif product_type == '宽带':return f"{product_type}-{broadband_speed}-{access_type}-{workorder_type}机"else:return f"{product_type}-{workorder_type}机"
def clear_data_in_excel_files(current_directory):files = [file for file in os.listdir(current_directory) if file.endswith('.xls') or file.endswith('.xlsx')]for file in files:file_path = os.path.join(current_directory, file) df = pd.read_excel(file_path) df = df.head(0) df.to_excel(file_path, index=False, header=True) print(f"成功清空文件: {file}")print("成功清空所有 Excel 文件的除第一行表头外的数据")def main():start_time = time.time()print("程序开始时间:", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)))warnings.filterwarnings('ignore') server = '127.0.0.1'database = 'YD'username = 'sa'password = 'xyz@1234560'conn = pymssql.connect(server, username, password, database)sql_query = '''SELECT 地市, 人员名称, [账号]FROM [ZHB]'''data = pd.read_sql(sql_query, conn)data.rename(columns={'人员名称': '处理人'}, inplace=True)current_directory = os.getcwd()files = [file for file in os.listdir(current_directory) if file.endswith('.xls')]workorder_count = {}for file in files:file_path = os.path.join(current_directory, file) df0 = pd.read_excel(file_path) df0['速率提取'] = df0['速率'].apply(extract_broadband_speed)df0['用户品质-NEW'] = df0['速率提取'].apply(lambda x: '千兆' if x == '1000M' else '普通品质')df0['产品工单类型合并'] = df0.apply(concatenate_with_dash, axis=1).str.replace('装机', '新装')df0['区域-修改'] = df0['区域'].fillna('城镇').str.replace('城市', '城镇').str.replace('乡镇', '城镇')df0['是否沿街-修改'] = df0['沿街'].apply(lambda x: '是' if pd.notnull(x) else '否')df0['开始时间'] = df0['预约上门时间'].apply(lambda x: str(x).split(' ~ ')[0].strip() if isinstance(x, str) else '')df0['结束时间'] = df0['预约上门时间'].apply(lambda x: str(x).split(' ~ ')[-1].strip() if isinstance(x, str) else '')print(f"成功读取文件: {file}")for workorder_type in df0['产品类型']:workorder_count[workorder_type] = workorder_count.get(workorder_type, 0) + 1merged_df = pd.merge(df0, data[['地市', '处理人', '账号']], on=['地市', '处理人'], how='left')for idx, (product_type, group_data) in enumerate(merged_df.groupby('产品类型')):print(f"产品类型 {idx + 1}: {product_type}")filtered_data = merged_df[merged_df['产品类型'].isin(['ZW', 'TR'])]filtered_data.to_excel("源文件/ZW_TR数据合并.xlsx", index=False)print("成功将产品类型为 ZW_TR数据合并.xlsx")product_types = ['云', '门铃', '喇叭', 'HM']hm_data = merged_df[merged_df['产品类型'].isin(product_types)]hm_data.to_excel("源文件/HM_数据.xlsx", index=False)other_data = merged_df[~merged_df['产品类型'].isin(['ZW', 'TR', '云', '门铃', '喇叭', 'HM'])]for product_type, group_data in other_data.groupby('产品类型'):file_name = f"源文件/{product_type}_数据.xlsx"group_data.to_excel(file_name, index=False)print(f"成功将产品类型为 {product_type} 的数据导出到文件 {file_name}")print("成功将数据库查询结果匹配并拆分业务导出为Excel文件")target_folder = '数据库字段/'clear_data_in_excel_files(target_folder)for file_name in os.listdir(target_folder):file_path = os.path.join(target_folder, file_name)if file_name.endswith('.xlsx'):source_file_path = os.path.join('源文件/', file_name)if os.path.isfile(source_file_path):df_source = pd.read_excel(source_file_path)df_target = pd.read_excel(file_path)for source_col, target_col in [('施工单编码', '编码'),('施工单编码', 'boss号'),('产品工单类型合并', '工单标题'),('市', '市'),('县', '县'),('接入方式', '接入方式'),('受理时间', '受理时间'),('派单时间', '派单时间'),('归档时间', '归档时间'),('预约上门时间', '前台预约时间'),('处理人', '施工人员'),('宽带速率', '宽带速率'),('宽带套餐资费', '套餐信息'),('开始时间', '预约上门时间'),('区域-修改', '区域'),('是否沿街-修改', '沿街商铺'),('用户品质-NEW', '品质'),]:if source_col in df_source.columns and target_col in df_target.columns:df_target[target_col] = df_source[source_col]if 'ZW_TR数据合并.xlsx' in source_file_path:if 'ZW资费' in df_source.columns and '信息' in df_target.columns:df_target['信息'] = df_source['ZW资费']df_target.to_excel(file_path, index=False)print(f"成功将字段复制到文件 {file_path} 中")print("产品类型总数:")for workorder_type, count in workorder_count.items():print(f"{workorder_type}: {count}")end_time = time.time()print("程序结束时间:", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)))run_time = end_time - start_timeprint("程序运行耗时:%0.2f" % run_time, "s")input("按任意键退出程序")if __name__ == "__main__":main()