社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

用 Python 高效处理大文件

Python中文社区 • 1 年前 • 798 次点击  

为了进行并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量,减少了整体处理时间。

例如,如果你正在处理一个大的CSV文件,你想修改一个单列。我们将把数据以数组的形式输入函数,它将根据可用的进程数量,一次并行处理多个值。这些进程是基于你的处理器内核的数量。

在这篇文章中,我们将学习如何使用multiprocessingjoblibtqdm Python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。

开始

我们将使用来自 Kaggle 的 US Accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。

https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents

我们将导入multiprocessingjoblibtqdm用于并行处理,pandas用于数据导入,renltkstring用于文本处理。

# Parallel Computing import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
# Data Ingestion
import pandas as pd
# Text Processing
import re
from nltk.corpus import stopwords
import string

在我们开始之前,让我们通过加倍cpu_count()来设置n_workers。正如你所看到的,我们有8个workers

n_workers = 2 * mp.cpu_count()
print(f"{n_workers} workers are available")
>>> 8 workers are available

下一步,我们将使用pandas read_csv函数读取大型CSV文件。然后打印出dataframe的形状、列的名称和处理时间。

%%timefile_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"df = pd.read_csv(file_name)
print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

输出:

Shape:(2845342, 47)
Column Names:
Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng','End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street','Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone','Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)','Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction','Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity','Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway','Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal','Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight','Astronomical_Twilight'],dtype='object')
CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 sWall time: 46.9 s

处理文本

clean_text是一个用于处理文本的简单函数。我们将使用nltk.copus获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。

def clean_text(text):   # Remove stop words  stops = stopwords.words("english")  text = " ".join([word for word in text.split() if word  not in stops])  # Remove Special Characters  text = text.translate(str.maketrans('', '', string.punctuation))  # removing the extra spaces  text = re.sub(' +',' ', text)  return text

串行处理

对于串行处理,我们可以使用pandas的.apply()函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()函数。

我们将处理280万条记录,并将结果保存回 “Description” 列中。

%%timetqdm.pandas()
df['Description'] = df['Description'].progress_apply(clean_text)

输出

高端处理器串行处理280万行花了9分5秒。

100% 🟩🟩🟩🟩 2845342/2845342 [09:05<00:00, 5724.25it/s]
CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7sWall time: 9min 5s

多进程处理

有多种方法可以对文件进行并行处理,我们将了解所有这些方法。multiprocessing是一个内置的python包,通常用于并行处理大型文件。

我们将创建一个有8个workers的多处理池,并使用map函数来启动进程。为了显示进度条,我们将使用tqdm

map函数由两部分组成。第一个部分需要函数,第二个部分需要一个参数或参数列表。

%%timep = mp.Pool(n_workers) 
df['Description'] = p.map(clean_text,tqdm(df['Description']))

输出

我们的处理时间几乎提高了3倍。处理时间从9分5秒下降到3分51秒。

100% 🟩🟩🟩🟩 2845342/2845342 [02:58<00:00, 135646.12it/s]
CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 sWall time: 3min 51s

并行处理

我们现在将学习另一个Python包来执行并行处理。在本节中,我们将使用joblibParalleldelayed来复制map函数。

  • Parallel需要两个参数:n_job = 8backend = multiprocessing

  • 然后,我们将在delayed函数中加入clean_text

  • 创建一个循环,每次输入一个值。

下面的过程是相当通用的,你可以根据你的需要修改你的函数和数组。我曾用它来处理成千上万的音频和视频文件,没有任何问题。

建议:使用 "try: ""except: "添加异常处理。

def text_parallel_clean(array):  result = Parallel(n_jobs=n_workers,backend="multiprocessing")(  delayed(clean_text)  (text) 


    
  for text in tqdm(array)  )  return result

text_parallel_clean()中添加“Description”列。

%%timedf['Description'] = text_parallel_clean(df['Description'])

输出

我们的函数比多进程处理Pool多花了13秒。即使如此,并行处理也比串行处理快4分59秒。

100% 🟩🟩🟩🟩 2845342/2845342 [04:03<00:00, 10514.98it/s]
CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 sWall time: 4min 4s

并行批量处理

有一个更好的方法来处理大文件,就是把它们分成若干批,然后并行处理。让我们从创建一个批处理函数开始,该函数将在单一批次的值上运行clean_function

批量处理函数

def proc_batch(batch):  return [  clean_text(text)  for text in batch  ]

将文件分割成批

下面的函数将根据workers的数量把文件分成多个批次。在我们的例子中,我们得到8个批次。

def batch_file(array,n_workers):


    
  file_len = len(array)  batch_size = round(file_len / n_workers)  batches = [  array[ix:ix+batch_size]  for ix in tqdm(range(0, file_len, batch_size))  ]  return batches
batches = batch_file(df['Description'],n_workers)
>>> 100% 8/8 [00:00<00:00, 280.01it/s]

运行并行批处理

最后,我们将使用Paralleldelayed来处理批次。

%%timebatch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(  delayed(proc_batch)  (batch)   for batch in tqdm(batches)  )
df['Description'] = [j for i in batch_output for j in i]

输出

我们已经改善了处理时间。这种技术在处理复杂数据和训练深度学习模型方面非常有名。

100% 🟩🟩🟩🟩 8/8 [00:00<00:00, 2.19it/s]
CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 sWall time: 3min 56s

tqdm 并发

tqdm将多处理带到了一个新的水平。它简单而强大。

process_map需要:

  • 函数名称
  • Dataframe 列名
  • max_workers
  • chucksize与批次大小类似。我们将用workers的数量来计算批处理的大小,或者你可以根据你的喜好来添加这个数字。
%%timefrom tqdm.contrib.concurrent import process_mapbatch = round(len(df)/n_workers)
df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)

输出

通过一行代码,我们得到了最好的结果:

100% 🟩🟩🟩🟩 2845342/2845342 [03:48<00:00, 1426320.93it/s]
CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 sWall time: 3min 51s

结论

我们需要找到一个平衡点,它可以是串行处理,并行处理,或批处理。如果你正在处理一个较小的、不太复杂的数据集,并行处理可能会适得其反。

在这个教程中,我们已经了解了各种处理大文件的Python包,它们允许我们对数据函数进行并行处理。

如果你只处理一个表格数据集,并且想提高处理性能,那么建议你尝试DaskdatatableRAPIDS

- 点击下方阅读原文加入社区会员 

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/137818
 
798 次点击