社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

再见CSV!Python中更快、更轻、更安全的数据处理方案:Parquet

数据STUDIO • 7 月前 • 265 次点击  


你是否也经历过这样的数据工作流程:导出CSV、用read_csv读取、修复数据类型、优化内存占用,然后周而复始?在小规模数据上这套流程确实可行,但随着数据量增长,问题就接踵而至——文件体积膨胀、数据类型漂移、加载时间越来越长。

今天我要告诉你一个事实:对于现代Python数据工作来说,Parquet就是新的CSV。它保留了相同的便利性,同时带来了显著的性能提升和安全性保障。让我为你详细解析为什么应该切换,以及如何无缝迁移。

为什么CSV在大数据场景下捉襟见肘

CSV的最大优势是简单易懂,但这也是它的致命陷阱:

  • 缺乏类型系统:所有数据都被当作文本读取,然后你需要手动转换类型——处理日期、可空整数和分类数据时尤其痛苦
  • 默认不压缩:虽然可以压缩,但读取压缩CSV会破坏并行处理能力
  • 行式存储布局:即使只需要三列,也不得不读取整个文件
  • 缺少元数据:编码方式、分隔符、表头格式等都需要猜测或记录

当数据量达到数千万行时,这些限制就成了日常工作的沉重负担。

Parquet为何胜出(特别是在Python生态中)

Parquet是专为分析场景设计的列式、强类型、压缩存储格式。

实际使用中:

  • 文件体积显著减小:字典编码+游程编码+位打包+可选的ZSTD/Snappy压缩,通常能让分析型数据表的体积缩小3-10倍
  • 读取速度大幅提升:列裁剪+谓词下推意味着"只读取需要的数据",而不是"读取全部再过滤"
  • 可靠的数据类型:布尔值就是布尔值,日期时间保持时区信息,空值有原生支持
  • 出色的互操作性:pandas、PyArrow、DuckDB、Spark、Polars、BigQuery等都原生支持
  • 支持追加和演进:可以添加新列,同时保持对旧文件的兼容

File Layout

你可能会问:是否需要重写整个数据管道?完全不需要!在Python中,这主要就是替换数据读取方式

一行代码实现读取器切换

pandas用户

import pandas as pd

# 旧方式:
df = pd.read_csv("orders_2024.csv")

# 新方式:
df = pd.read_parquet("orders_2024.parquet")  # 新版pandas默认使用pyarrow引擎

只需要特定列?很简单:

df = pd.read_parquet("orders_2024.parquet", columns=["order_id""country""total"])

PyArrow(追求极致速度和控制)

import pyarrow.dataset as ds

dataset = ds.dataset("s3://bucket/orders/", format="parquet", partitioning="hive")
table = dataset.to_table(columns=["order_id""date""total"],
                         filter=ds.field( "date") >= ds.scalar("2025-01-01"))
df = table.to_pandas(types_mapper=pd.ArrowDtype)  # 使用pandas的可空数据类型

DuckDB(直接对文件执行SQL查询)

import duckdb

con = duckdb.connect()
df = con.execute("""
    SELECT order_id, country, total
    FROM 's3://bucket/orders/*.parquet'
    WHERE date >= DATE '2025-01-01' AND country IN ('IN','US')
"""
).df()

这才是真正的魔法:无需数据库服务器,直接查询Parquet文件。

迁移实战:从CSV平稳过渡到Parquet

1. 预先定义数据模式

建立明确的数据契约:列名、逻辑类型、是否可空、计量单位等,避免后续的"0/1还是true/false"混乱。

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd

schema = pa.schema([
    pa.field("order_id", pa.int64()),
    pa.field("date", pa.timestamp("ms", tz="UTC")),
    pa.field("country", pa.string()),
    pa.field("total", pa.decimal128(182)),
    pa.field("coupon", pa.string()).with_nullable(True),
])

# 将一次性CSV转换为类型明确的Parquet
df = pd.read_csv("orders_2024.csv", parse_dates=["date"], dtype={"order_id""Int64"})
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
pq.write_table(table, "orders_2024.parquet", compression="zstd", coerce_timestamps="ms")

重要提示:使用pd.ArrowDtype或pandas的可空数据类型,避免包含空值的整数列被静默转换为浮点型。

2. 合理分区提升性能

根据实际过滤条件使用的字段进行分区(如date=YYYY/MM/DDcountry=IN)。避免过度分区,数千个小文件比几十个中等文件更糟糕。

import pyarrow.dataset as ds
import pyarrow.parquet as pq

pq.write_to_dataset(
    table,
    root_path="orders_parquet/",
    partition_cols=["country""date"],
    compression="zstd",
)

3. 明智选择压缩算法

  • Snappy:速度极快,是很好的默认选择
  • ZSTD:压缩率更高,CPU消耗稍多。对于冷数据或网络I/O受限的场景是绝佳选择

4. 安全地进行模式演进

需要添加新列?使用默认值或空值添加,写入新文件,让读取器自动合并模式:

import pyarrow.dataset as ds
dataset = ds.dataset("orders_parquet/", format="parquet")
table = dataset.to_table()  # 自动合并各文件的模式

真实世界案例:效果显著

从CSV到Parquet的完整迁移代码

某团队将每日销售数据(约800万行/天)从CSV迁移到Parquet后观察到:

  • 存储占用从~12 GB/天(CSV)降至~2.5 GB/天(Parquet + ZSTD)
  • 报表子集(5列,最近7天数据)的pandas加载时间从~70秒缩短到~9秒,这得益于列裁剪和分区过滤
  • "神秘bug"显著减少,日期时间和十进制数在生产者和消费者之间保持一致

你的具体数据可能会有所不同,但列式存储+压缩+强类型的组合优势是稳定可靠的。

以下是一个完整的Python代码示例,展示如何将日常销售数据从CSV迁移到Parquet格式,并演示性能对比:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import time
import os
from datetime import datetime, timedelta
import numpy as np

# 生成模拟的销售数据(模拟8M行/天的数据量)
def generate_sample_data(num_rows=80000):# 使用8万行作为示例
    """生成模拟销售数据"""
    countries = ['US''IN''UK''DE' 'FR''JP''CA''AU']
    
    data = {
        'order_id': range(100000100000 + num_rows),
        'date': pd.date_range('2024-01-01', periods=num_rows, freq='T'),
        'country': np.random.choice(countries, num_rows),
        'product_id': np.random.randint(10009999, num_rows),
        'quantity': np.random.randint(110, num_rows),
        'unit_price': np.round(np.random.uniform(10500, num_rows), 2),
        'customer_id': np.random.randint(1000099999, num_rows),
        'coupon_used': np.random.choice([TrueFalseNone], num_rows, p=[0.20.750.05])
    }
    
    df = pd.DataFrame(data)
    df['total'] = df['quantity'] * df['unit_price']
    return df

# 定义Parquet模式
def define_schema():
    """定义强类型的数据模式"""
    return pa.schema([
        pa.field("order_id", pa.int64()),
        pa.field("date", pa.timestamp("ms", tz="UTC")),
        pa.field("country", pa.string()),
        pa.field("product_id", pa.int32()),
        pa.field("quantity", pa.int32()),
        pa.field("unit_price", pa.float64()),
        pa.field("customer_id", pa.int32()),
        pa.field("coupon_used", pa.bool_()).with_nullable(True),
        pa.field("total", pa.float64())
    ])

# CSV处理流程
def process_with_csv(df, csv_path):
    """传统的CSV处理流程"""
    print("=== CSV处理流程 ===")
    
    # 保存为CSV
    start_time = time.time()
    df.to_csv(csv_path, index=False)
    csv_write_time = time.time() - start_time
    
    # 获取文件大小
    csv_size = os.path.getsize(csv_path) / (1024 * 1024)  # MB
    
    # 读取CSV(全量)
    start_time = time.time()
    df_csv = pd.read_csv(csv_path)
    csv_read_full_time = time.time() - start_time
    
    # 读取CSV(选择特定列)
    start_time = time.time()
    df_csv_subset = pd.read_csv(csv_path, usecols=['order_id''country''total'])
    csv_read_subset_time = time.time() - start_time
    
    print(f"CSV文件大小: {csv_size:.2f} MB" )
    print(f"CSV写入时间: {csv_write_time:.2f} 秒")
    print(f"CSV全量读取时间: {csv_read_full_time:.2f} 秒")
    print(f"CSV子集读取时间: {csv_read_subset_time:.2f} 秒")
    
    return {
        'size': csv_size,
        'write_time': csv_write_time,
        'read_full_time': csv_read_full_time,
        'read_subset_time': csv_read_subset_time
    }

# Parquet处理流程
def process_with_parquet(df, parquet_path, partitioned_path):
    """Parquet处理流程"""
    print("\n=== Parquet处理流程 ===")
    
    schema = define_schema()
    
    # 转换为PyArrow Table
    table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
    
    # 保存为单文件Parquet
    start_time = time.time()
    pq.write_table(table, parquet_path, compression='zstd')
    parquet_write_time = time.time() - start_time
    
    # 获取文件大小
    parquet_size = os.path.getsize(parquet_path) / (1024 * 1024)  # MB
    
    # 分区存储
    start_time = time.time()
    pq.write_to_dataset(
        table,
        root_path=partitioned_path,
        partition_cols=['country'],
        compression='zstd'
    )
    partitioned_write_time = time.time() - start_time
    
    # 读取Parquet(全量)
    start_time = time.time()
    df_parquet = pd.read_parquet(parquet_path)
    parquet_read_full_time = time.time() - start_time
    
    # 读取Parquet(选择特定列)
    start_time = time.time()
    df_parquet_subset = pd.read_parquet(parquet_path, columns=['order_id''country''total'])
    parquet_read_subset_time = time.time() - start_time
    
    # 使用分区数据进行高效查询
    start_time = time.time()
    dataset = ds.dataset(partitioned_path, format="parquet")
    # 只查询美国的数据,且只需要3个列
    table_filtered = dataset.to_table(
        columns=["order_id""date""total"],
        filter=ds.field("country") == ds.scalar("US")
    )
    df_partitioned = table_filtered.to_pandas()
    partitioned_query_time = time.time() - start_time
    
    print(f"Parquet文件大小: {parquet_size:.2f} MB")
    print(f"Parquet写入时间: {parquet_write_time:.2f} 秒")
    print(f"分区写入时间: {partitioned_write_time:.2f} 秒")
    print(f"Parquet全量读取时间: {parquet_read_full_time:.2f} 秒")
    print(f"Parquet子集读取时间: {parquet_read_subset_time:.2f} 秒")
    print(f"分区查询时间: {partitioned_query_time:.2f} 秒")
    print(f"分区查询结果行数: {len(df_partitioned)} ")
    
    return {
        'size': parquet_size,
        'write_time': parquet_write_time,
        'read_full_time': parquet_read_full_time,
        'read_subset_time': parquet_read_subset_time,
        'partitioned_query_time': partitioned_query_time
    }

# 性能对比分析
def performance_comparison(csv_stats, parquet_stats):
    """对比分析两种格式的性能"""
    print("\n=== 性能对比分析 ===")
    
    # 文件大小对比
    size_reduction = (csv_stats['size'] - parquet_stats['size']) / csv_stats['size'] * 100
    print(f"文件大小减少: {size_reduction:.1f}%")
    
    # 读取性能对比
    full_read_improvement = (csv_stats['read_full_time'] - parquet_stats['read_full_time']) / csv_stats['read_full_time'] * 100
    subset_read_improvement = (csv_stats['read_subset_time'] - parquet_stats['read_subset_time']) / csv_stats['read_subset_time'] * 100
    
    print(f"全量读取性能提升: {full_read_improvement:.1f}%")
    print(f"子集读取性能提升: {subset_read_improvement:.1f}%")
    
    # 压缩比
    compression_ratio = csv_stats['size'] / parquet_stats['size']
    print(f"压缩比: {compression_ratio:.1f}x")

# 主执行函数
def main():
    # 创建输出目录
    os.makedirs('output', exist_ok=True)
    os.makedirs('output/partitioned', exist_ok=True)
    
    # 文件路径
    csv_path = 'output/sales_data.csv'
    parquet_path = 'output/sales_data.parquet'
    partitioned_path = 'output/partitioned'
    
    print("生成模拟数据...")
    df = generate_sample_data(80000)  # 8万行数据
    print(f"数据维度: {df.shape}")
    print(f"内存使用: {df.memory_usage(deep=True).sum() / (1024 * 1024):.2f} MB")
    
    # 处理CSV
    csv_stats = process_with_csv(df, csv_path)
    
    # 处理Parquet
    parquet_stats = process_with_parquet(df, parquet_path, partitioned_path)
    
    # 性能对比
    performance_comparison(csv_stats, parquet_stats)
    
    # 数据类型验证
    print("\n=== 数据类型验证 ===")
    df_parquet_loaded = pd.read_parquet(parquet_path)
    print("Parquet加载后的数据类型:")
    print(df_parquet_loaded.dtypes)
    
    # 数据一致性检查
    print("\n=== 数据一致性检查 ===")
    df_csv_loaded = pd.read_csv(csv_path)
    # 由于类型转换,需要确保关键数据一致
    common_orders = len(set(df_parquet_loaded['order_id']) & set(df_csv_loaded['order_id']))
    print(f"共同订单数: {common_orders} (应该等于总行数: {len(df)})")

if __name__ == "__main__":
    main()

预期输出结果示例

运行上述代码后,你会看到类似以下的输出:

生成模拟数据...
数据维度: (80000, 9)
内存使用: 8.42 MB

=== CSV处理流程 ===
CSV文件大小: 12.45 MB
CSV写入时间: 1.23 秒
CSV全量读取时间: 0.85 秒
CSV子集读取时间: 0.72 秒

=== Parquet处理流程 ===
Parquet文件大小: 2.17 MB
Parquet写入时间: 0.45 秒
分区写入时间: 0.68 秒
Parquet全量读取时间: 0.12 秒
Parquet子集读取时间: 0.08 秒
分区查询时间: 0.05 秒
分区查询结果行数: 10023

=== 性能对比分析 ===
文件大小减少: 82.6%
全量读取性能提升: 85.9%
子集读取性能提升: 88.9%
压缩比: 5.7x

=== 数据类型验证 ===
Parquet加载后的数据类型:
order_id        int64
date           datetime64[ns]
country         object
product_id      int32
quantity        int32
unit_price    float64
customer_id     int32
coupon_used      bool
total         float64
dtype: object

=== 数据一致性检查 ===
共同订单数: 80000 (应该等于总行数: 80000)

这个完整案例展示了:

  1. 数据生成:创建真实的销售数据模拟
  2. 格式转换:从CSV到Parquet的完整流程
  3. 性能对比:文件大小、读写速度的量化比较
  4. 高级功能:分区存储和条件查询
  5. 数据验证:确保迁移过程中数据完整性

在实际生产环境中,随着数据量增加到数百万行,性能优势会更加明显。

实用的代码模式

快速读取数据子集

import pyarrow.dataset as ds

dataset = ds.dataset("orders_parquet/", format="parquet", partitioning="hive")
filt = (ds.field("date") >= ds.scalar("2025-10-01")) & (ds.field("country") == "IN")
table = dataset.to_table(columns=["order_id""total"], filter=filt)
df = table.to_pandas()

数据接入时实时写入Parquet

import pyarrow as pa, pyarrow.parquet as pq

writer = None
for batch_df in stream_source():  # 生成pandas DataFrame
    batch = pa.Table.from_pandas(batch_df, preserve_index=False)
    if writer is None:
        writer = pq.ParquetWriter("live_orders.parquet", batch.schema, compression="zstd")
    writer.write_table(batch)
writer.close()

追加前的数据验证

import pyarrow as pa

expected = {
    "order_id": pa.int64(),
    "date": pa.timestamp("ms", tz="UTC"),
    "country": pa.string(),
    "total": pa.decimal128(182),
    "coupon": pa.string()
}

def validate_table(tbl: pa.Table) :
    for name, typ in expected.items():
        assert name in tbl.schema.names, f"缺少列: {name}"
        assert pa.types.is_compatible(tbl.schema.field(name).type, typ), f"类型错误: {name}"

常见疑问与解答

"但CSV是人类可读的"

确实如此。可以保留少量CSV样本用于人工检查,或者使用parquet-tools或pandas的.head()快速查看Parquet内容。

"小众工具支持怎么办?"

Parquet现在已成为主流。Python的pandas、PyArrow、DuckDB以及几乎所有数据湖仓栈都原生支持。

"需要Spark才能受益吗?"

完全不需要。单机Python环境能立即享受到列裁剪、压缩和类型化I/O带来的好处。

"如果数据生产者只提供CSV怎么办?"

将原始CSV放入隔离区,在接入时使用固定模式转换为Parquet,让Parquet成为下游的统一数据契约

实用检查清单

  • 使用Parquet作为存储格式,而不是CSV。将CSV仅视为接入阶段的临时格式
  • 在数据边界强制执行模式,尽早拒绝或转换不符合要求的数据
  • 基于实际过滤条件分区(日期/国家等),不要对所有列都分区
  • 静态数据优先使用ZSTD,CPU密集型场景考虑Snappy
  • 使用DuckDB/PyArrow进行选择性读取,停止"以防万一而加载所有列"的做法
  • 在生产环境前测试模式演进(添加/删除列)

采用这些实践,你的未来工作生活会轻松很多。

写在最后

如果你在Python中处理数据,切换到Parquet不是赶时髦,而是必备技能。你将获得更快的加载速度、更少的存储占用,并告别每个迭代周期都要处理类型问题的烦恼。

你可以尝试将一个热门的CSV文件转换为Parquet,使用PyArrow或DuckDB配置一个过滤读取,测量效果,然后继续推进。你的数据处理效率将迎来质的飞跃。

尝试一下,你会发现这可能是今年对你数据工作流程最重要的改进之一。


🏴‍☠️宝藏级🏴‍☠️ 原创公众号『数据STUDIO』内容超级硬核。公众号以Python为核心语言,垂直于数据科学领域,包括可戳👉  PythonMySQL数据分析数据可视化机器学习与数据挖掘爬虫 等,从入门到进阶!

长按👇关注- 数据STUDIO -设为星标,干货速递

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/189630