import akshare as ak
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
def get_stock_data(symbol, name):
"""
从akshare获取单个股票的历史数据并预处理
"""
print(f"正在获取{name}({symbol})历史数据...")
try:
df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date="20000101",
end_date=datetime.now().strftime('%Y%m%d'))
df.rename(columns={
'日期': 'date',
'开盘': 'open',
'收盘': 'close',
'最高': 'high',
'最低': 'low',
'成交量': 'volume',
'成交额': 'amount',
'振幅': 'amplitude',
'涨跌幅': 'pct_change',
'涨跌额': 'change',
'换手率': 'turnover'
}, inplace=True)
df['date'] = pd.to_datetime(df['date'])
df['symbol'] = symbol
df['name'] = name
return df
except Exception as e:
print(f"获取股票{symbol}数据失败: {e}")
return pd.DataFrame()
def update_all_stocks_data(store, stock_list, batch_size=100):
"""
批量更新所有股票数据到统一的表格中
"""
all_data = []
for i in range(0, len(stock_list), batch_size):
batch = stock_list[i:i + batch_size]
batch_data = []
for stock in batch:
df = get_stock_data(stock, stock)
if not df.empty:
batch_data.append(df)
if batch_data:
all_data.extend(batch_data)
print(f"已处理
{min(i + batch_size, len(stock_list))}/{len(stock_list)} 只股票")
if all_data:
combined_df = pd.concat(all_data, ignore_index=True)
try:
existing_data = store.select('all_stocks')
combined_df = pd.concat([existing_data, combined_df])
combined_df.drop_duplicates(subset=['symbol', 'date'], keep='last', inplace=True)
except:
pass
combined_df.sort_values(by=['date', 'symbol'], inplace=True)
store.put('all_stocks', combined_df, format='table',
data_columns=['symbol', 'date', 'close'],
index=['symbol', 'date'])
return combined_df
else:
return pd.DataFrame()
def create_index_table(store):
"""
创建股票代码索引表,便于快速查找特定股票
"""
try:
all_data = store.select('all_stocks')
latest_dates = all_data.groupby('symbol')['date'].max().reset_index()
latest_dates.rename(columns={'date': 'last_update'}, inplace=True)
store.put('stock_index', latest_dates, format='table', data_columns=['symbol'])
return latest_dates
except:
return pd.DataFrame()
def query_stocks(store, condition):
"""
执行条件查询
"""
try:
result = store.select('all_stocks', where=condition)
return result
except Exception as e:
print(f"查询失败: {e}")
return pd.DataFrame()
def get_csi300_constituents():
"""获取沪深300成分股"""
df = ak.index_stock_cons_sina(symbol="000300")
df["code"] = df["code"].str.replace(r"sh\.|sz\.", "", regex=True)
return df["code"].tolist()
def main():
hdf5_path = 'all_stocks_data.h5'
stock_list = get_csi300_constituents()
with pd.HDFStore(hdf5_path, mode='a'
, complib='blosc', complevel=9) as store:
all_data = update_all_stocks_data(store, stock_list)
print(f"所有股票数据已更新,总记录数: {len(all_data)}")
index_table = create_index_table(store)
print("股票索引表已创建")
print("\n查询 close > 10 and close < 20 的股票:")
result1 = query_stocks(store, 'close > 10 & close < 20')
print(f"找到 {len(result1)} 条记录")
if not result1.empty:
print(result1[['symbol', 'name', 'date', 'close']].head())
print("\n查询涨幅大于5%的股票:")
result2 = query_stocks(store, 'pct_change > 5')
print(f"找到 {len(result2)} 条记录")
if not result2.empty:
print(result2[['symbol', 'name', 'date', 'pct_change']].head())
latest_date = all_data['date'].max()
print(f"\n查询最新交易日 {latest_date.date()} 的所有股票数据:")
result3 = query_stocks(store, f'date == "{latest_date}"')
print(f"找到 {len(result3)} 条记录")
if not result3.empty:
print(result3[['symbol', 'name', 'close', 'pct_change']].head())
print(f"\n查询平安银行(000001)的历史数据:")
result4 = query_stocks(store, 'symbol == "000001"')
print(f"找到 {len(result4)} 条记录")
if not result4.empty:
print(result4[['date', 'close', 'pct_change']].head())
print(f"\n所有操作已完成,数据已保存到: {hdf5_path}")
if __name__ == "__main__":
main()