上篇文章写了DuckDB, 对于普通人来说, 可能用mysql数据保存量化数据更合适。 毕竟对于程序员来说, 大家一开始学的数据库基本是Mysql。选择自己最适合的更重要。 最近由于东方财富频繁访问问题,经常有同学问我数据源的问题,这里写一写tushare存入mysql。 希望对大家有所启发。 1、mysql数据库下载, 比如选择5.7或8.0https://www.mysql.com/cn/downloads/2、我们可以选择下载mysql客户端比如navicat, 网上搜索很多。3、新建一个库,比如db_stock库,设置用户名root,密码假设12345678。1、首先获取交易日日历(由于tushare需要2000积分, 这个接口暂时用akshare获取)2、数据源, 最初我准备用miniqmt的。 想了想,就用tushare保存吧。 也方便不使用windows的同学。接口:daily,可以通过数据工具调试和查看数据
数据说明:交易日每天15点~16点之间入库。本接口是未复权行情,停牌期间不提供数据
调取说明:基础积分每分钟内可调取500次,每次6000条数据,一次请求相当于提取一个股票23年历史
描述:获取股票行情数据,或通过通用行情接口获取数据,包含了前后复权数据既然接口基础积分每分钟内可调取500次,每次6000条数据。 我就做个判断每分钟最多调用多少次就sleep。 第一次初始化需要一些时间。 后续每天16:00定时获取一次就可以了。最后附上完整代码,需要的自取。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。import tushare as ts
import pandas as pd
from sqlalchemy import create_engine
import schedule
import time
from datetime import datetime, timedelta
import akshare as ak
ts.set_token('XXXXX')
pro = ts.pro_api()
engine = create_engine('mysql://root:12345678@localhost/db_stock?charset=utf8')
def get_trade_dates(start_date, end_date):
"""
使用AkShare获取指定日期范围内的交易日历
参数格式:YYYYMMDD (如:20200101)
"""
trade_date_df = ak.tool_trade_date_hist_sina()
trade_date_df['trade_date'] = pd.to_datetime(trade_date_df['trade_date']).dt.strftime('%Y%m%d')
mask = (trade_date_df['trade_date'] >= start_date) & (trade_date_df['trade_date'] <= end_date)
trade_dates = trade_date_df.loc[mask, 'trade_date'].tolist()
return trade_dates
def init_historical_data():
"""
初始化历史数据(20200102 - 20250719)
按交易日获取全市场数据,避免个股循环
"""
trade_dates = get_trade_dates('20200101', '20250719')
print(f"📅 共获取 {len(trade_dates)} 个交易日数据")
for i, date in enumerate(trade_dates):
if i % 400 == 0 and i > 0:
print(f"⏳ 已完成 {i} 个交易日,暂停60秒避免超频...")
time.sleep(60)
try:
df = pro.daily(trade_date=date)
df.to_sql('stock_daily', engine, if_exists='append', index=False)
print(f"✅ {date} 写入 {len(df)} 条")
except Exception as e:
print(f"❌ {date} 获取失败: {e}")
def daily_update():
"""每日16:00执行的更新任务"""
today = datetime.now().strftime("%Y%m%d")
if pd.read_sql(f"SELECT 1 FROM stock_daily WHERE trade_date='{today}'", engine).empty:
try:
df = pro.daily(trade_date=today)
if not df.empty:
df.to_sql('stock_daily'
, engine, if_exists='append', index=False)
print(f"🔄 更新{today}数据: {len(df)}条")
else:
print(f"⚠️ 今日{today}无交易数据(可能为节假日)")
except Exception as e:
print(f"🔥 更新失败: {e}")
else:
print(f"⏩ {today}数据已存在,跳过更新")
if __name__ == '__main__':
print("=" * 50)
print("📊 股票数据初始化系统启动")
print("=" * 50)
start_time = time.time()
init_historical_data()
duration = time.time() - start_time
print(f"⏱️ 历史数据初始化完成,耗时: {duration // 60:.0f}分{duration % 60:.0f}秒")
schedule.every().day.at("16:00").do(daily_update)
print("\n🚀 定时任务守护进程已启动 (每日16:00执行)")
print("=" * 50)
while True:
schedule.run_pending()
time.sleep(60)
如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。如果对每日复盘感兴趣,欢迎关注我另外一个号。重在市场理解(大盘情绪、 热门概念板块、 热门个股), 技术只是辅助。