Py学习  »  Python

量化交易数据清洗:用 Python 打造稳健的回测基础

数据科学实战 • 1 月前 • 98 次点击  

欢迎加入专注于财经数据与量化投研的【数据科学实战】知识星球!在这里,您将获取持续更新的《财经数据宝典》和《量化投研宝典》,这两部宝典相辅相成,为您在量化投研道路上提供明确指引。 我们提供了精选的国内外量化投研的 250+ 篇高质量文章,并每日更新最新研究成果,涵盖策略开发、因子分析、风险管理等核心领域。 无论您是量化投资新手还是经验丰富的研究者,星球社区都能帮您少走弯路,事半功倍,共同探索数据驱动的投资世界!

引言

在量化交易领域,有一句话广为流传:"垃圾进,垃圾出"(Garbage in, garbage out)。无论你的交易策略多么精妙,模型多么复杂,如果输入的市场数据存在问题,最终的回测结果和实盘表现都会大打折扣。

数据质量问题往往是"沉默的杀手"——它不会报错,不会崩溃,只会悄悄地让你的策略学习到错误的模式,让你的回测结果充满幻觉。本文将基于实战经验,带你构建一套完整的市场数据清洗流程,让你的量化研究建立在坚实的数据基础之上。

为什么数据质量决定盈亏?

市场数据天生就是混乱的:交易所会调整交易时间,不同数据供应商的数据存在差异,服务器时钟会漂移,股票会有拆股分红,加密货币市场 24 小时不停歇。如果不在前期处理好这些问题,你的模型会学习到数据伪影,回测会产生信息泄露,实盘系统会追逐虚假信号。

常见的数据问题包括以下几类:

缺失数据问题:K 线或 Tick 数据缺失会导致技术指标计算错误,交易信号对价格跳空视而不见。

异常值问题:错误报价、乌龙指等噪声数据会让特征变得不稳定,止损逻辑失效。

时间不同步问题:多资产数据时间不对齐会在配对交易或组合模型中产生虚假相关性。

公司行为问题:拆股、分红等公司行为如不处理,会产生虚假的价格跳跃,破坏风险指标。

时区问题:时区和夏令时处理不当会导致 K 线逻辑错位,交易时段规则失效。

数据清洗核心步骤

一套完整的数据清洗流程应包含以下环节:

第一步:标准化数据结构。统一字段命名为 ts、open、high、low、close、volume、symbol、venue。

第二步:统一时区。解析时间戳,先定位到交易所本地时间,再转换为 UTC 时间。

第三步:排序去重。按 symbol 和 ts 稳定排序,对重复数据保留最后一条或加权处理。

第四步:验证 OHLCV 数据。确保 low ≤ min(open, close),high ≥ max(open, close),low ≤ high,volume ≥ 0。

第五步:处理缺失数据。事件驱动模型保留缺口并编码为特征;基于 K 线的指标需明确选择删除、插值或前向填充策略。

第六步:检测异常值。使用稳健的 z-score/MAD 或滚动 IQR 方法,对异常值进行截断或删除,并保留审计记录。

第七步:处理公司行为。对股票进行拆股和分红调整,同时保留原始价格和调整后价格。

第八步:期货连续合约处理。定义换月规则,进行后向调整或比率调整。

第九步:重采样与规整化。对齐到交易日历,创建预期时间索引,按容差重新索引。

第十步:多资产对齐。将不同资产的 K 线对齐到统一时间网格,填充时避免使用未来数据。

实战代码:Pandas 数据清洗管道

下面是一套可直接使用的 Pandas 数据清洗代码,涵盖了上述核心步骤。

数据加载与标准化

import pandas as pd
import numpy as np

def load_raw(csv_path, symbol):
    """
    加载原始 CSV 数据并标准化格式
    
    参数:
        csv_path: CSV 文件路径
        symbol: 股票或资产代码
    返回:
        标准化后的 DataFrame
    """

    df = pd.read_csv(
        csv_path,
        usecols=["timestamp""open""high""low""close""volume"],
        dtype={
            "open""float64",
            "high""float64",
            "low""float64",
            "close""float64",
            "volume""float64"
        }
    )
    
    # 解析时间戳,假设数据源为交易所本地时间
    df["ts"] = pd.to_datetime(df["timestamp"], utc=False, errors="coerce")
    
    # 示例:将美东时间转换为 UTC
    df["ts"] = (
        df["ts"]
        .dt.tz_localize("America/New_York", nonexistent="shift_forward")
        .dt.tz_convert("UTC")
    )
    
    # 整理列名和顺序
    df = (
        df.drop(columns=["timestamp"])
        .assign(symbol=symbol)
        .rename(columns=str.lower)
        [["ts""open""high""low""close""volume""symbol"]]
    )
    
    return df

排序与去重

def sort_dedup(df):
    """
    对数据进行排序并去除重复记录
    
    参数:
        df: 原始 DataFrame
    返回:
        排序去重后的 DataFrame
    """

    # 按 symbol 和时间戳排序
    df = df.sort_values(["symbol""ts"])
    
    # 对于重复时间戳,保留最后一条(通常是供应商修正后的数据)
    df = df.drop_duplicates(subset=["symbol""ts"], keep="last")
    
    return df

OHLCV 数据验证与修复

def ohlcv_checks(df):
    """
    检查 OHLCV 数据是否符合逻辑约束
    
    返回不符合约束的异常记录
    """

    bad = (
        # low 应该小于等于 open 和 close 的最小值
        (df["low"] > df[["open""close"]].min(axis=1)) |
        # high 应该大于等于 open 和 close 的最大值
        (df["high"] < df[["open""close"]].max(axis=1)) |
        # low 应该小于等于 high
        (df["low"] > df["high"]) |
        # 成交量不能为负
        (df["volume"] 0)
    )
    return df.loc[bad]


def fix_bad_bars(df):
    """
    修复不符合逻辑约束的 K 线数据
    
    采用软修复策略:尽可能修正边界值,无法修正时标记为 NaN
    """

    # 修正 low 值:取 open、close、high 中的最小值
    df["low"] = np.minimum(
        df["low"], 
        df[["open""close""high"]].min(axis=1)
    )
    
    # 修正 high 值:取 open、close、low 中的最大值
    df["high"] = np.maximum(
        df["high"], 
        df[["open""close""low"]].max(axis=1)
    )
    
    # 如果 low 仍然大于 high,标记为 NaN 待人工审核
    df.loc[df["low"] > df["high"], ["low""high"]] = np.nan
    
    # 负成交量标记为 NaN
    df.loc[df["volume"] 0"volume"] = np.nan
    
    return df

缺失数据处理

def report_missing(df, freq="1min"):
    """
    统计每个 symbol 的缺失 K 线数量
    """

    out = []
    for sym, g in df.groupby("symbol", group_keys=False):
        # 构建预期的时间索引
        rng = pd.date_range(
            g["ts"].min().floor(freq), 
            g["ts"].max().ceil(freq), 
            freq=freq
        )
        # 计算缺失的时间点
        missing = pd.Index(rng).difference(g["ts"])
        out.append({"symbol": sym, "missing_count": len(missing)})
    
    return pd.DataFrame(out)


def fill_policy(df, method="ffill", max_gap="5min"):
    """
    按指定策略填充缺失数据
    
    参数:
        method: 填充方法,'ffill' 为前向填充,'interpolate' 为插值
        max_gap: 最大允许填充的时间间隔,超过此间隔不填充
    """

    df = df.sort_values(["symbol""ts"])
    
    def _fill(g):
        g = g.set_index("ts")
        
        if method == "ffill":
            # 计算时间间隔,对大间隔进行分块处理
            dt = g.index.to_series().diff()
            mask = (dt.notna()) & (dt > pd.to_timedelta(max_gap))
            block = mask.cumsum().values
            
            # 按块进行前向填充,避免跨大间隔填充
            g[["open""high""low""close""volume"]] = (
                g.groupby(block)[["open""high""low""close""volume"]]
                .ffill()
            )
            
        elif method == "interpolate":
            # 线性插值,限制最大插值范围
            g[["open""high""low""close""volume"]] = (
                g[["open""high""low""close""volume"]]
                .interpolate(limit=5)
            )
        
        return g.reset_index()
    
    return df.groupby("symbol", group_keys=False).apply(_fill)

异常值检测与处理




    
def cap_outliers(df, col="close", window=50, z=6.0):
    """
    使用稳健统计方法检测并截断异常值
    
    参数:
        col: 要检测的列名
        window: 滚动窗口大小
        z: 异常值阈值(以 MAD 倍数计)
    """

    def _cap(g):
        x = g[col]
        
        # 计算滚动中位数
        med = x.rolling(window, min_periods=20).median()
        
        # 计算滚动 MAD(中位数绝对偏差)
        mad = (x - med).abs().rolling(window, min_periods=20).median()
        
        # 计算稳健 z-score(1.4826 是使 MAD 与标准差可比的常数)
        robust_z = (x - med) / (1.4826 * mad.replace(0, np.nan))
        
        # 计算截断边界
        upper = med + z * 1.4826 * mad
        lower = med - z * 1.4826 * mad
        
        # 将超出边界的值截断到边界
        g[col] = np.where(
            robust_z > z, upper,
            np.where(robust_z < -z, lower, x)
        )
        
        return g
    
    return df.groupby("symbol", group_keys=False).apply(_cap)

多资产时间对齐

def align_symbols(dfs, freq="1min"):
    """
    将多个资产的数据对齐到统一的时间网格
    
    参数:
        dfs: 字典,格式为 {symbol: DataFrame}
    返回:
        对齐后的字典
    """

    aligned = {}
    
    # 构建所有资产的时间并集
    idx = None
    for sym, g in dfs.items():
        sidx = pd.DatetimeIndex(g["ts"]).tz_convert("UTC")
        idx = sidx if idx isNoneelse idx.union(sidx)
    idx = idx.sort_values()
    
    # 将每个资产重新索引到统一时间网格
    for sym, g in dfs.items():
        gg = g.set_index("ts").reindex(idx)
        gg.index.name = "ts"
        aligned[sym] = gg.reset_index()
    
    return  aligned

完整清洗流程

def clean_market_csv(csv_path, symbol, bar_freq="1min"):
    """
    端到端的市场数据清洗流程
    
    参数:
        csv_path: 数据文件路径
        symbol: 资产代码
        bar_freq: K 线频率
    返回:
        清洗后的 DataFrame
    """

    # 加载并标准化数据
    df = load_raw(csv_path, symbol)
    
    # 排序去重
    df = sort_dedup(df)
    
    # 检查并修复异常 K 线
    bad = ohlcv_checks(df)
    ifnot bad.empty:
        print(f"发现 {len(bad)} 条异常 K 线,正在修复...")
        df = fix_bad_bars(df)
    
    # 规整化时间索引
    df = regularize(df, freq=bar_freq)
    
    # 按策略填充缺失数据
    df = fill_policy(df, method="ffill", max_gap="10min")
    
    # 检测并截断异常值
    df = cap_outliers(df, col="close", window=100, z=8.0)
    
    # 最终数据质量断言
    assert df["ts"].is_monotonic_increasing, "时间戳必须单调递增"
    assertnot df.duplicated(subset=["symbol""ts"]).any(), "不允许存在重复记录"
    
    return df

缺失数据处理策略

处理缺失数据时,需要根据具体场景选择合适的策略:

删除策略适用于严谨的研究场景,虽然更安全但会损失流动性较差资产的信息。

有界前向填充适用于需要稳定指标的场景,但必须进行间隔感知的掩码处理。

插值策略适用于需要平滑特征的场景,但要避免在收盘到开盘之间进行插值。

基于模型的填补是最后的手段,必须记录方法和不确定性。

在应用填充策略前,建议进行以下诊断分析:按小时、交易时段和 symbol 统计缺失情况,发现流动性问题或供应商故障;统计连续缺失的长度,长时间连续缺失通常意味着日历或交易时间表问题;分析缺失与成交量、波动率的相关性,警惕选择性偏差。

常见陷阱与规避方法

幸存者偏差:不要用当前的成分股列表来构建历史数据,否则会高估策略表现。

前视偏差:永远不要用未来的数据填充缺失值,即使是 reindex 的默认行为也可能导致这个问题。

时区混淆:磁盘存储使用 UTC 时间,只在必要时转换为交易所本地时间进行分析。

隐藏的不完整交易日:节假日和临时停牌会导致异常收益,需要显式标记。

盲目信任供应商标记 :验证供应商标记为"已修正"的数据,并存储自己的质量控制标记。

总结

数据清洗工作虽然不够光鲜,却是值得信赖的量化研究和实盘交易的基石。通过建立清晰的缺失数据处理策略、稳健的异常值处理方法、日历感知的时间对齐机制以及透明的调整记录,你的特征和回测结果将反映真实的市场行为,而非数据的怪癖。

记住这条核心原则:永远不要前向填充收益率或标签。如果必须为指标填充收盘价,请对大间隔进行掩码处理并记录在案。

建议将每个清洗步骤封装为小型、可测试的函数,持久化数据时附带元数据(供应商、日历、调整规则),对流程进行版本控制,并生成每日质量报告。只有这样,你的量化交易系统才能建立在坚实可靠的数据基础之上。

参考文章

加入专注于财经数据与量化投研的知识星球【数据科学实战】,获取本文完整研究解析、代码实现细节。

财经数据与量化投研知识社区

核心权益如下:

  1. 赠送《财经数据宝典》完整文档,汇集多年财经数据维护经验
  2. 赠送《量化投研宝典》完整文档,汇集多年量化投研领域经验
  3. 赠送《PyBroker-入门及实战》视频课程,手把手学习量化策略开发
  4. 每日分享高质量量化投研文章(已更新 180+篇)、代码和相关资料
  5. 定期更新高频财经数据
  6. 参与年度不少于 10 次专属直播与录播课程
  7. 与核心开发者直接交流,解决实际问题
  8. 获取专业微信群交流机会和课程折扣

星球已有丰富内容积累,包括量化投研论文、财经高频数据、 PyBroker 视频教程、定期直播、数据分享和答疑解难。适合对量化投研和财经数据分析有兴趣的学习者及从业者。欢迎加入我们!

好文推荐

1. 用 Python 打造股票预测系统:Transformer 模型教程(一)

2. 用 Python 打造股票预测系统:Transformer 模型教程(二)

3. 用 Python 打造股票预测系统:Transformer 模型教程(三)

4. 用 Python 打造股票预测系统:Transformer 模型教程(完结)

5. 揭秘隐马尔可夫模型:因子投资的制胜武器

6. YOLO 也能预测股市涨跌?计算机视觉在股票市场预测中的应用

7. 金融 AI 助手:FinGPT 让你轻松掌握市场分析

8. 量化交易秘籍:为什么专业交易员都在用对数收益率?

9. Python 量化投资利器:Ridge、Lasso 和 Elastic Net 回归详解

10. 掌握金融波动率模型:完整 Python 实现指南

好书推荐



Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/191707