欢迎加入专注于财经数据与量化投研的【数据科学实战】知识星球!在这里,您将获取持续更新的《财经数据宝典》和《量化投研宝典》,这两部宝典相辅相成,为您在量化投研道路上提供明确指引。 我们提供了精选的国内外量化投研的 250+ 篇高质量文章,并每日更新最新研究成果,涵盖策略开发、因子分析、风险管理等核心领域。 无论您是量化投资新手还是经验丰富的研究者,星球社区都能帮您少走弯路,事半功倍,共同探索数据驱动的投资世界!
引言
在量化交易领域,有一句话广为流传:"垃圾进,垃圾出"(Garbage in, garbage out)。无论你的交易策略多么精妙,模型多么复杂,如果输入的市场数据存在问题,最终的回测结果和实盘表现都会大打折扣。
数据质量问题往往是"沉默的杀手"——它不会报错,不会崩溃,只会悄悄地让你的策略学习到错误的模式,让你的回测结果充满幻觉。本文将基于实战经验,带你构建一套完整的市场数据清洗流程,让你的量化研究建立在坚实的数据基础之上。
为什么数据质量决定盈亏?
市场数据天生就是混乱的:交易所会调整交易时间,不同数据供应商的数据存在差异,服务器时钟会漂移,股票会有拆股分红,加密货币市场 24 小时不停歇。如果不在前期处理好这些问题,你的模型会学习到数据伪影,回测会产生信息泄露,实盘系统会追逐虚假信号。
常见的数据问题包括以下几类:
缺失数据问题:K 线或 Tick 数据缺失会导致技术指标计算错误,交易信号对价格跳空视而不见。
异常值问题:错误报价、乌龙指等噪声数据会让特征变得不稳定,止损逻辑失效。
时间不同步问题:多资产数据时间不对齐会在配对交易或组合模型中产生虚假相关性。
公司行为问题:拆股、分红等公司行为如不处理,会产生虚假的价格跳跃,破坏风险指标。
时区问题:时区和夏令时处理不当会导致 K 线逻辑错位,交易时段规则失效。
数据清洗核心步骤
一套完整的数据清洗流程应包含以下环节:
第一步:标准化数据结构。统一字段命名为 ts、open、high、low、close、volume、symbol、venue。
第二步:统一时区。解析时间戳,先定位到交易所本地时间,再转换为 UTC 时间。
第三步:排序去重。按 symbol 和 ts 稳定排序,对重复数据保留最后一条或加权处理。
第四步:验证 OHLCV 数据。确保 low ≤ min(open, close),high ≥ max(open, close),low ≤ high,volume ≥ 0。
第五步:处理缺失数据。事件驱动模型保留缺口并编码为特征;基于 K 线的指标需明确选择删除、插值或前向填充策略。
第六步:检测异常值。使用稳健的 z-score/MAD 或滚动 IQR 方法,对异常值进行截断或删除,并保留审计记录。
第七步:处理公司行为。对股票进行拆股和分红调整,同时保留原始价格和调整后价格。
第八步:期货连续合约处理。定义换月规则,进行后向调整或比率调整。
第九步:重采样与规整化。对齐到交易日历,创建预期时间索引,按容差重新索引。
第十步:多资产对齐。将不同资产的 K 线对齐到统一时间网格,填充时避免使用未来数据。
实战代码:Pandas 数据清洗管道
下面是一套可直接使用的 Pandas 数据清洗代码,涵盖了上述核心步骤。
数据加载与标准化
import pandas as pd
import numpy as np
def load_raw(csv_path, symbol):
"""
加载原始 CSV 数据并标准化格式
参数:
csv_path: CSV 文件路径
symbol: 股票或资产代码
返回:
标准化后的 DataFrame
"""
df = pd.read_csv(
csv_path,
usecols=["timestamp", "open", "high", "low", "close", "volume"],
dtype={
"open": "float64",
"high": "float64",
"low": "float64",
"close": "float64",
"volume": "float64"
}
)
# 解析时间戳,假设数据源为交易所本地时间
df["ts"] = pd.to_datetime(df["timestamp"], utc=False, errors="coerce")
# 示例:将美东时间转换为 UTC
df["ts"] = (
df["ts"]
.dt.tz_localize("America/New_York", nonexistent="shift_forward")
.dt.tz_convert("UTC")
)
# 整理列名和顺序
df = (
df.drop(columns=["timestamp"])
.assign(symbol=symbol)
.rename(columns=str.lower)
[["ts", "open", "high", "low", "close", "volume", "symbol"]]
)
return df
排序与去重
def sort_dedup(df):
"""
对数据进行排序并去除重复记录
参数:
df: 原始 DataFrame
返回:
排序去重后的 DataFrame
"""
# 按 symbol 和时间戳排序
df = df.sort_values(["symbol", "ts"])
# 对于重复时间戳,保留最后一条(通常是供应商修正后的数据)
df = df.drop_duplicates(subset=["symbol", "ts"], keep="last")
return df
OHLCV 数据验证与修复
def ohlcv_checks(df):
"""
检查 OHLCV 数据是否符合逻辑约束
返回不符合约束的异常记录
"""
bad = (
# low 应该小于等于 open 和 close 的最小值
(df["low"] > df[["open", "close"]].min(axis=1)) |
# high 应该大于等于 open 和 close 的最大值
(df["high"] < df[["open", "close"]].max(axis=1)) |
# low 应该小于等于 high
(df["low"] > df["high"]) |
# 成交量不能为负
(df["volume"] 0)
)
return df.loc[bad]
def fix_bad_bars(df):
"""
修复不符合逻辑约束的 K 线数据
采用软修复策略:尽可能修正边界值,无法修正时标记为 NaN
"""
# 修正 low 值:取 open、close、high 中的最小值
df["low"] = np.minimum(
df["low"],
df[["open", "close", "high"]].min(axis=1)
)
# 修正 high 值:取 open、close、low 中的最大值
df["high"] = np.maximum(
df["high"],
df[["open", "close", "low"]].max(axis=1)
)
# 如果 low 仍然大于 high,标记为 NaN 待人工审核
df.loc[df["low"] > df["high"], ["low", "high"]] = np.nan
# 负成交量标记为 NaN
df.loc[df["volume"] 0, "volume"] = np.nan
return df
缺失数据处理
def report_missing(df, freq="1min"):
"""
统计每个 symbol 的缺失 K 线数量
"""
out = []
for sym, g in df.groupby("symbol", group_keys=False):
# 构建预期的时间索引
rng = pd.date_range(
g["ts"].min().floor(freq),
g["ts"].max().ceil(freq),
freq=freq
)
# 计算缺失的时间点
missing = pd.Index(rng).difference(g["ts"])
out.append({"symbol": sym, "missing_count": len(missing)})
return pd.DataFrame(out)
def fill_policy(df, method="ffill", max_gap="5min"):
"""
按指定策略填充缺失数据
参数:
method: 填充方法,'ffill' 为前向填充,'interpolate' 为插值
max_gap: 最大允许填充的时间间隔,超过此间隔不填充
"""
df = df.sort_values(["symbol", "ts"])
def _fill(g):
g = g.set_index("ts")
if method == "ffill":
# 计算时间间隔,对大间隔进行分块处理
dt = g.index.to_series().diff()
mask = (dt.notna()) & (dt > pd.to_timedelta(max_gap))
block = mask.cumsum().values
# 按块进行前向填充,避免跨大间隔填充
g[["open", "high", "low", "close", "volume"]] = (
g.groupby(block)[["open", "high", "low", "close", "volume"]]
.ffill()
)
elif method == "interpolate":
# 线性插值,限制最大插值范围
g[["open", "high", "low", "close", "volume"]] = (
g[["open", "high", "low", "close", "volume"]]
.interpolate(limit=5)
)
return g.reset_index()
return df.groupby("symbol", group_keys=False).apply(_fill)
异常值检测与处理
def cap_outliers(df, col="close", window=50, z=6.0):
"""
使用稳健统计方法检测并截断异常值
参数:
col: 要检测的列名
window: 滚动窗口大小
z: 异常值阈值(以 MAD 倍数计)
"""
def _cap(g):
x = g[col]
# 计算滚动中位数
med = x.rolling(window, min_periods=20).median()
# 计算滚动 MAD(中位数绝对偏差)
mad = (x - med).abs().rolling(window, min_periods=20).median()
# 计算稳健 z-score(1.4826 是使 MAD 与标准差可比的常数)
robust_z = (x - med) / (1.4826 * mad.replace(0, np.nan))
# 计算截断边界
upper = med + z * 1.4826 * mad
lower = med - z * 1.4826 * mad
# 将超出边界的值截断到边界
g[col] = np.where(
robust_z > z, upper,
np.where(robust_z < -z, lower, x)
)
return g
return df.groupby("symbol", group_keys=False).apply(_cap)
多资产时间对齐
def align_symbols(dfs, freq="1min"):
"""
将多个资产的数据对齐到统一的时间网格
参数:
dfs: 字典,格式为 {symbol: DataFrame}
返回:
对齐后的字典
"""
aligned = {}
# 构建所有资产的时间并集
idx = None
for sym, g in dfs.items():
sidx = pd.DatetimeIndex(g["ts"]).tz_convert("UTC")
idx = sidx if idx isNoneelse idx.union(sidx)
idx = idx.sort_values()
# 将每个资产重新索引到统一时间网格
for sym, g in dfs.items():
gg = g.set_index("ts").reindex(idx)
gg.index.name = "ts"
aligned[sym] = gg.reset_index()
return
aligned
完整清洗流程
def clean_market_csv(csv_path, symbol, bar_freq="1min"):
"""
端到端的市场数据清洗流程
参数:
csv_path: 数据文件路径
symbol: 资产代码
bar_freq: K 线频率
返回:
清洗后的 DataFrame
"""
# 加载并标准化数据
df = load_raw(csv_path, symbol)
# 排序去重
df = sort_dedup(df)
# 检查并修复异常 K 线
bad = ohlcv_checks(df)
ifnot bad.empty:
print(f"发现 {len(bad)} 条异常 K 线,正在修复...")
df = fix_bad_bars(df)
# 规整化时间索引
df = regularize(df, freq=bar_freq)
# 按策略填充缺失数据
df = fill_policy(df, method="ffill", max_gap="10min")
# 检测并截断异常值
df = cap_outliers(df, col="close", window=100, z=8.0)
# 最终数据质量断言
assert df["ts"].is_monotonic_increasing, "时间戳必须单调递增"
assertnot df.duplicated(subset=["symbol", "ts"]).any(), "不允许存在重复记录"
return df
缺失数据处理策略
处理缺失数据时,需要根据具体场景选择合适的策略:
删除策略适用于严谨的研究场景,虽然更安全但会损失流动性较差资产的信息。
有界前向填充适用于需要稳定指标的场景,但必须进行间隔感知的掩码处理。
插值策略适用于需要平滑特征的场景,但要避免在收盘到开盘之间进行插值。
基于模型的填补是最后的手段,必须记录方法和不确定性。
在应用填充策略前,建议进行以下诊断分析:按小时、交易时段和 symbol 统计缺失情况,发现流动性问题或供应商故障;统计连续缺失的长度,长时间连续缺失通常意味着日历或交易时间表问题;分析缺失与成交量、波动率的相关性,警惕选择性偏差。
常见陷阱与规避方法
幸存者偏差:不要用当前的成分股列表来构建历史数据,否则会高估策略表现。
前视偏差:永远不要用未来的数据填充缺失值,即使是 reindex 的默认行为也可能导致这个问题。
时区混淆:磁盘存储使用 UTC 时间,只在必要时转换为交易所本地时间进行分析。
隐藏的不完整交易日:节假日和临时停牌会导致异常收益,需要显式标记。
盲目信任供应商标记
:验证供应商标记为"已修正"的数据,并存储自己的质量控制标记。
总结
数据清洗工作虽然不够光鲜,却是值得信赖的量化研究和实盘交易的基石。通过建立清晰的缺失数据处理策略、稳健的异常值处理方法、日历感知的时间对齐机制以及透明的调整记录,你的特征和回测结果将反映真实的市场行为,而非数据的怪癖。
记住这条核心原则:永远不要前向填充收益率或标签。如果必须为指标填充收盘价,请对大间隔进行掩码处理并记录在案。
建议将每个清洗步骤封装为小型、可测试的函数,持久化数据时附带元数据(供应商、日历、调整规则),对流程进行版本控制,并生成每日质量报告。只有这样,你的量化交易系统才能建立在坚实可靠的数据基础之上。
参考文章
加入专注于财经数据与量化投研的知识星球【数据科学实战】,获取本文完整研究解析、代码实现细节。
财经数据与量化投研知识社区
核心权益如下:
- 赠送《财经数据宝典》完整文档,汇集多年财经数据维护经验
- 赠送《量化投研宝典》完整文档,汇集多年量化投研领域经验
- 赠送《PyBroker-入门及实战》视频课程,手把手学习量化策略开发
- 每日分享高质量量化投研文章(已更新 180+篇)、代码和相关资料
星球已有丰富内容积累,包括量化投研论文、财经高频数据、 PyBroker 视频教程、定期直播、数据分享和答疑解难。适合对量化投研和财经数据分析有兴趣的学习者及从业者。欢迎加入我们!