import akshare as akimport pandas as pdimport numpy as npfrom datetime import datetime, timedeltaimport os
def get_stock_data(symbol, name): """ 从akshare获取单个股票的历史数据并预处理 """ print(f"正在获取{name}({symbol})历史数据...") try: df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date="20000101", end_date=datetime.now().strftime('%Y%m%d'))
df.rename(columns={ '日期': 'date', '开盘': 'open', '收盘': 'close', '最高': 'high', '最低': 'low', '成交量': 'volume', '成交额': 'amount', '振幅': 'amplitude', '涨跌幅': 'pct_change', '涨跌额': 'change', '换手率': 'turnover' }, inplace=True)
df['date'] = pd.to_datetime(df['date']) df['symbol'] = symbol df['name'] = name
return df except Exception as e: print(f"获取股票{symbol}数据失败: {e}") return pd.DataFrame()
def update_all_stocks_data(store, stock_list, batch_size=100): """ 批量更新所有股票数据到统一的表格中 """ all_data = []
for i in range(0, len(stock_list), batch_size): batch = stock_list[i:i + batch_size] batch_data = []
for stock in batch: df = get_stock_data(stock, stock) if not df.empty: batch_data.append(df)
if batch_data: all_data.extend(batch_data)
print(f"已处理
{min(i + batch_size, len(stock_list))}/{len(stock_list)} 只股票")
if all_data: combined_df = pd.concat(all_data, ignore_index=True)
try: existing_data = store.select('all_stocks') combined_df = pd.concat([existing_data, combined_df]) combined_df.drop_duplicates(subset=['symbol', 'date'], keep='last', inplace=True) except: pass
combined_df.sort_values(by=['date', 'symbol'], inplace=True)
store.put('all_stocks', combined_df, format='table', data_columns=['symbol', 'date', 'close'], index=['symbol', 'date'])
return combined_df else: return pd.DataFrame()
def create_index_table(store): """ 创建股票代码索引表,便于快速查找特定股票 """ try: all_data = store.select('all_stocks') latest_dates = all_data.groupby('symbol')['date'].max().reset_index() latest_dates.rename(columns={'date': 'last_update'}, inplace=True)
store.put('stock_index', latest_dates, format='table', data_columns=['symbol'])
return latest_dates except: return pd.DataFrame()
def query_stocks(store, condition): """ 执行条件查询 """ try: result = store.select('all_stocks', where=condition) return result except Exception as e: print(f"查询失败: {e}") return pd.DataFrame()
def get_csi300_constituents(): """获取沪深300成分股""" df = ak.index_stock_cons_sina(symbol="000300") df["code"] = df["code"].str.replace(r"sh\.|sz\.", "", regex=True) return df["code"].tolist()
def main(): hdf5_path = 'all_stocks_data.h5'
stock_list = get_csi300_constituents()
with pd.HDFStore(hdf5_path, mode='a'
, complib='blosc', complevel=9) as store: all_data = update_all_stocks_data(store, stock_list) print(f"所有股票数据已更新,总记录数: {len(all_data)}")
index_table = create_index_table(store) print("股票索引表已创建")
print("\n查询 close > 10 and close < 20 的股票:") result1 = query_stocks(store, 'close > 10 & close < 20') print(f"找到 {len(result1)} 条记录") if not result1.empty: print(result1[['symbol', 'name', 'date', 'close']].head())
print("\n查询涨幅大于5%的股票:") result2 = query_stocks(store, 'pct_change > 5') print(f"找到 {len(result2)} 条记录") if not result2.empty: print(result2[['symbol', 'name', 'date', 'pct_change']].head())
latest_date = all_data['date'].max() print(f"\n查询最新交易日 {latest_date.date()} 的所有股票数据:") result3 = query_stocks(store, f'date == "{latest_date}"') print(f"找到 {len(result3)} 条记录") if not result3.empty: print(result3[['symbol', 'name', 'close', 'pct_change']].head())
print(f"\n查询平安银行(000001)的历史数据:") result4 = query_stocks(store, 'symbol == "000001"') print(f"找到 {len(result4)} 条记录") if not result4.empty: print(result4[['date', 'close', 'pct_change']].head())
print(f"\n所有操作已完成,数据已保存到: {hdf5_path}")
if __name__ == "__main__": main()