社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

用它做Python并行数据分析,隔壁程序猿都馋哭了!

Python学习交流 • 5 年前 • 544 次点击  

有时候你在做 Python 数据分析的时候,可能会出现这么个情况:用 Pandas 打开一个超大型数据集,想得到一些度量(metrics),然后就尴尬地卡住了。

大家都知道,如果你处理大数据,手里用的是 Pandas,有时要等上一小时才能得到一个 Series 的平均值,甚至都还没调用 apply 函数。这还只是几百万行啊,如果是几十亿行,那最好还是用 Spark 之类的高级工具吧。

那么就没有好办法了吗?有的,就有这么一个工具,能够加速 Python 数据分析,既不需要你使用配置更高的硬件设施,也不必切换编程语言。当然,如果你的数据集超级超级大,它的最终作用也会有限,但比普通的 Python 扩展工具好多了。特别是如果你不用做大量的重建索引,那么这个工具非常适合你。

这个工具叫 Dask ,数据科学家 Luciano Strika 专门试用了这个工具,并做了测试,发现 Dask 在做并行数据分析时,比常规 Pandas 快出许多倍。

什么是Dask?

Dask 是一个开源项目,能提供 NumPy Arrays,Pandas Dataframes 和常规列表的抽象,允许你使用多核处理并行运行它们。

下面这段直接摘自教程:

Dask 提供模仿了 NumPy,列表和 Pandas 的高级 Array,Bag 以及 DataFrame 集合,但能够在无法放入主内存的数据集上并行运行。对大型数据集来说,Dask 的高级集合是 NumPy 和 Pandas 的替代方案。

听起来真不错!于是我(作者Luciano Strika)决定亲自试试 Dask Dataframes,并对它们进行了几个基准测试。

阅读文档

我首先阅读了官方文档,看看建议我们使用 Dask 做哪些工作。以下是官方文档( http:// docs.dask.org/en/latest /dataframe.html )中的相关部分:

  • 操纵大型数据集,即使这些数据集无法放入内存

  • 使用许多核来加速长计算

  • 使用标准Pandas操作(如 groupby,join 和时间序列计算)对大型数据集进行分布式计算

然后在下面,它列出了一些如果使用 Dask Dataframes 会快速完成的事情:

  • 算术运算(乘以或添加到Series)

  • 常见聚合(平均值,最小值,最大值,求和等)

  • 调用 apply(只要它在索引中,也就是说,不是在 groupby('y')之后'y'不是索引)

  • 调用 value_counts(),drop_duplicates()或corr()

  • 使用 loc,isin 和行式选择进行过滤

#returns only the rows where x is >5, by reference (writing on them alters original df)
df2 = df.loc[df['x'] > 5]
#returns only the rows where x is 0,1,2,3 or 4, by reference
df3 = df.x.isin(range(4))
#returns only the rows where x is >5, by read-only reference (can't be written on)
df4 = df[df['x']>5]

如何使用 Dask Dataframes

Dask Dataframes 与 Pandas Dataframes 具有相同的 API,只是聚合和 apply 函数延迟执行,并且需要通过调用 compute 方法来计算。要想生成 Dask Dataframe,可以像在 Pandas 中一样调用 read_csv 方法,或者,如果给出 Pandas Dataframe df,只需调用

dd = ddf.from_pandas(df, npartitions=N)

其中 ddf 是你导入 Dask Dataframes 的名称,而 npartitions 是一个参数,告诉 Dataframe 如何对其进行分区。

根据 StackOverflow 上的说法,建议将 Dataframe 划分为与计算机核数数量相同的分区,或者是该数量的几倍,因为每个分区将在不同的线程上运行。如果分区过多,它们之间的通信代价会高很多。

动手吧:做点基准测试

我写了一个 Jupyter notebook 试用这个框架,在 GitHub 上能看到,可以自己运行一下:

https:// github.com/StrikingLoo/ dask-dataframe-benchmarking

我运行的基准测试可以在 Github 上的 Jupyter notebook 中找到,主要有以下要点:

def get_big_mean():
return dfn.salary.mean().compute()
def get_big_mean_old():
return df3.salary.mean()
def get_big_max():
return dfn.salary.max().compute()
def get_big_max_old():
return df3.salary.max()
def get_big_sum():
return dfn.salary.sum().compute()
def get_big_sum_old():
return df3.salary.sum()
def filter_df():
df = dfn[dfn['salary']>5000]
def filter_df_old():
df = df3[df3['salary']>5000]

这里df3是一个常规的 Pandas Dataframe,拥有 2500 万行,使用 这段脚本 生成,其中列是名称,姓氏和薪水,从列表中随机抽样。我用了一个有 50 行的数据集并连接了 500000 次,因为我对分析本身并不太感兴趣,但运行它时才会。

dfn 就是基于 df3 的 Dask Dataframe。

第一批结果:不太乐观

我首先尝试使用 3 个分区进行测试,因为我的电脑只有 4 个内核,不想过度使用它。这次使用 Dask 的结果非常差,而且还要等很久才能得到结果,不过我怀疑这可能是分区过少的原因:

204.313940048 seconds for get_big_mean
39.7543280125 seconds for get_big_mean_old
131.600986004 seconds for get_big_max
43.7621600628 seconds for get_big_max_old
120.027213097 seconds for get_big_sum
7.49701309204 seconds for get_big_sum_old
0.581165790558 seconds for filter_df
226.700095892 seconds for filter_df_old

可以看到在我使用 Dask 时大多数操作变慢了很多。这给了我一些启示,可能必须使用更多分区才行。产生延迟计算所花的成本也可以忽略不计(在某些情况下不到半秒),所以如果重复使用它,成本不会随着时间推移而摊销。

我还用 apply 方法尝试了这个测试:

def f(x):
return (13*x+5)%7
def apply_random_old():
df3['random']= df3['salary'].apply(f)

def apply_random():
dfn['random']= dfn['salary'].apply(f).compute()

并有非常相似的结果:

369.541605949 seconds for apply_random
157.643756866 seconds for apply_random_old

因此,一般来说,大多数操作的速度都是初始操作的两倍,尽管过滤器的速度要快得多。我觉得或许应该在上面调用 compute,所以对这个结果持保留态度。

更多分区:加速惊人

得到前面这些不尽人意的结果之后,我决定我可能只是没有使用足够的分区。毕竟,整件事情的重点是并行运行,所以或许我只需进一步并行化?所以我尝试了 8 个分区的相同测试,得到了如下结果(省略了非并行数据帧的结果,因为它们基本相同):

3.08352184296 seconds for get_big_mean
1.3314101696 seconds for get_big_max
1.21639800072 seconds for get_big_sum
0.228978157043 seconds for filter_df
112.135010004 seconds for apply_random
50.2007009983 seconds for value_count_test

这就对了!大多数操作的运行速度比常规 Dataframe 快十倍,甚至 apply 的运行速度也更快了!我还运行了 value_count 测试,它只调用“薪水”Series 上的 value_count 方法。对于上下文,在我足足等 10 分钟后在常规 Dataframe 上运行此测试时,必须终止该过程。这次只用了 50 秒!

所以之前都没用对工具,它的速度可比常规 Dataframe 快多了。

最后再说一点

鉴于我是在一台相当旧的 4 核 PC 上运行了 2500 万行数据,所以是相当了不起的。所以我的建议是,下回你必须在本地或从单个 AWS 实例上理数据集时,一定要试试 Dask 这个框架。运行速度简直不要太快。



今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/VU7H5E6KRh
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/26950
 
544 次点击