Python中国社区  »  python开源

用它做Python并行数据分析,隔壁程序猿都馋哭了

夏天 • 5 天前 • 75 次点击  

有时候你在做 Python 数据分析的时候,可能会出现这么个情况:用 Pandas 打开一个超大型数据集,想得到一些度量(metrics),然后就尴尬地卡住了。

大家都知道,如果你处理大数据,手里用的是 Pandas,有时要等上一小时才能得到一个 Series 的平均值,甚至都还没调用 apply 函数。这还只是几百万行啊,如果是几十亿行,那最好还是用 Spark 之类的高级工具吧。

那么就没有好办法了吗?有的,就有这么一个工具,能够加速 Python 数据分析,既不需要你使用配置更高的硬件设施,也不必切换编程语言。当然,如果你的数据集超级超级大,它的最终作用也会有限,但比普通的 Python 扩展工具好多了。特别是如果你不用做大量的重建索引,那么这个工具非常适合你。

领取福利加python编程语言学习QQ群 515267276 这个工具叫 Dask,数据科学家 Luciano Strika 专门试用了这个工具,并做了测试,发现 Dask 在做并行数据分析时,比常规 Pandas 快出许多倍。

什么是Dask?

Dask 是一个开源项目,能提供 NumPy Arrays,Pandas Dataframes 和常规列表的抽象,允许你使用多核处理并行运行它们。

下面这段直接摘自教程:

Dask 提供模仿了 NumPy,列表和 Pandas 的高级 Array,Bag 以及 DataFrame 集合,但能够在无法放入主内存的数据集上并行运行。对大型数据集来说,Dask 的高级集合是 NumPy 和 Pandas 的替代方案。

听起来真不错!于是我(作者Luciano Strika)决定亲自试试 Dask Dataframes,并对它们进行了几个基准测试。

阅读文档

我首先阅读了官方文档,看看建议我们使用 Dask 做哪些工作。以下是官方文档(docs.dask.org/en/latest)中的相关部分:

操纵大型数据集,即使这些数据集无法放入内存

使用许多核来加速长计算

使用标准Pandas操作(如 groupby,join 和时间序列计算)对大型数据集进行分布式计算

然后在下面,它列出了一些如果使用 Dask Dataframes 会快速完成的事情:

算术运算(乘以或添加到Series)

常见聚合(平均值,最小值,最大值,求和等)

调用 apply(只要它在索引中,也就是说,不是在 groupby('y')之后'y'不是索引)

调用 value_counts(),drop_duplicates()或corr()

使用 loc,isin 和行式选择进行过滤

returns only the rows where x is >5, by reference (writing on them alters original df) df2 = df.loc[df['x'] > 5] #returns only the rows where x is 0,1,2,3 or 4, by reference df3 = df.x.isin(range(4)) #returns only the rows where x is >5, by read-only reference (can't be written on) df4 = df[df['x']>5]

如何使用 Dask Dataframes

Dask Dataframes 与 Pandas Dataframes 具有相同的 API,只是聚合和 apply 函数延迟执行,并且需要通过调用 compute 方法来计算。要想生成 Dask Dataframe,可以像在 Pandas 中一样调用 read_csv 方法,或者,如果给出 Pandas Dataframe df,只需调用

dd = ddf.from_pandas(df, npartitions=N)

其中 ddf 是你导入 Dask Dataframes 的名称,而 npartitions 是一个参数,告诉 Dataframe 如何对其进行分区。

根据 StackOverflow 上的说法,建议将 Dataframe 划分为与计算机核数数量相同的分区,或者是该数量的几倍,因为每个分区将在不同的线程上运行。如果分区过多,它们之间的通信代价会高很多。

动手吧:做点基准测试

我写了一个 Jupyter notebook 试用这个框架,在 GitHub 上能看到,可以自己运行一下:github.com/StrikingLoo/

我运行的基准测试可以在 Github 上的 Jupyter notebook 中找到,主要有以下要点:

领取福利加python编程语言学习QQ群 515267276 def get_big_mean():    return dfn.salary.mean().compute() def get_big_mean_old():    return df3.salary.mean() def get_big_max():    return dfn.salary.max().compute() def get_big_max_old():    return df3.salary.max() def get_big_sum():    return dfn.salary.sum().compute() def get_big_sum_old():    return df3.salary.sum() def filter_df():    df = dfn[dfn['salary']>5000] def filter_df_old(): df = df3[df3['salary']>5000]

这里df3是一个常规的 Pandas Dataframe,拥有 2500 万行,使用这段脚本生成,其中列是名称,姓氏和薪水,从列表中随机抽样。我用了一个有 50 行的数据集并连接了 500000 次,因为我对分析本身并不太感兴趣,但运行它时才会。

dfn 就是基于 df3 的 Dask Dataframe。

第一批结果:不太乐观

我首先尝试使用 3 个分区进行测试,因为我的电脑只有 4 个内核,不想过度使用它。这次使用 Dask 的结果非常差,而且还要等很久才能得到结果,不过我怀疑这可能是分区过少的原因:

204.313940048 seconds for get_big_mean 39.7543280125 seconds for get_big_mean_old 131.600986004 seconds for get_big_max 43.7621600628 seconds for get_big_max_old 120.027213097 seconds for get_big_sum 7.49701309204 seconds for get_big_sum_old 0.581165790558 seconds for filter_df 226.700095892 seconds for filter_df_old

可以看到在我使用 Dask 时大多数操作变慢了很多。这给了我一些启示,可能必须使用更多分区才行。产生延迟计算所花的成本也可以忽略不计(在某些情况下不到半秒),所以如果重复使用它,成本不会随着时间推移而摊销。

我还用 apply 方法尝试了这个测试:

def f(x):    return (13*x+5)%7 def apply_random_old():    df3['random']= df3['salary'].apply(f) def apply_random(): dfn['random']= dfn['salary'].apply(f).compute()

并有非常相似的结果:

369.541605949 seconds for apply_random 157.643756866 seconds for apply_random_old

因此,一般来说,大多数操作的速度都是初始操作的两倍,尽管过滤器的速度要快得多。我觉得或许应该在上面调用 compute,所以对这个结果持保留态度。

更多分区:加速惊人

得到前面这些不尽人意的结果之后,我决定我可能只是没有使用足够的分区。毕竟,整件事情的重点是并行运行,所以或许我只需进一步并行化?所以我尝试了 8 个分区的相同测试,得到了如下结果(省略了非并行数据帧的结果,因为它们基本相同):

3.08352184296 seconds for get_big_mean 1.3314101696 seconds for get_big_max 1.21639800072 seconds for get_big_sum 0.228978157043 seconds for filter_df 112.135010004 seconds for apply_random 50.2007009983 seconds for value_count_test

这就对了!大多数操作的运行速度比常规 Dataframe 快十倍,甚至 apply 的运行速度也更快了!我还运行了 value_count 测试,它只调用“薪水”Series 上的 value_count 方法。对于上下文,在我足足等 10 分钟后在常规 Dataframe 上运行此测试时,必须终止该过程。这次只用了 50 秒!

所以之前都没用对工具,它的速度可比常规 Dataframe 快多了。

最后再说一点

鉴于我是在一台相当旧的 4 核 PC 上运行了 2500 万行数据,所以是相当了不起的。所以我的建议是,下回你必须在本地或从单个 AWS 实例上理数据集时,一定要试试 Dask 这个框架。运行速度简直不要太快。

关注微信公众号:程序员交流互动平台!获取资料学习!

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/27162
 
75 次点击  
分享到微博