Py学习  »  DATABASE

量化之tushare日线数据入库Mysql例子

子晓聊技术 • 1 周前 • 9 次点击  

上篇文章写了DuckDB, 对于普通人来说, 可能用mysql数据保存量化数据更合适。 毕竟对于程序员来说, 大家一开始学的数据库基本是Mysql。选择自己最适合的更重要。 最近由于东方财富频繁访问问题,经常有同学问我数据源的问题,这里写一写tushare存入mysql。 希望对大家有所启发。 

先下载mysql资源
1、mysql数据库下载, 比如选择5.7或8.0
https://www.mysql.com/cn/downloads/
2、我们可以选择下载mysql客户端比如navicat, 网上搜索很多。
3、新建一个库,比如db_stock库,设置用户名root,密码假设12345678。

这里我写一个处理方案对日线数据入库的操作。
1、首先获取交易日日历(由于tushare需要2000积分, 这个接口暂时用akshare获取)
2、数据源, 最初我准备用miniqmt的。 想了想,就用tushare保存吧。 也方便不使用windows的同学。
3、我们看下tushare 的日线行情
接口:daily,可以通过数据工具调试和查看数据
数据说明:交易日每天15点~16点之间入库。本接口是未复权行情,停牌期间不提供数据
调取说明:基础积分每分钟内可调取500次,每次6000条数据,一次请求相当于提取一个股票23年历史
描述:获取股票行情数据,或通过通用行情接口获取数据,包含了前后复权数据
4、替换掉代码中的token为自己的

既然接口基础积分每分钟内可调取500次,每次6000条数据。 我就做个判断每分钟最多调用多少次就sleep。 第一次初始化需要一些时间。 后续每天16:00定时获取一次就可以了。

相关文章:
【Python技术】初学者python搭建环境相关工具整理
做量化我们应该如何选择数据库

最后附上完整代码,需要的自取。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。
import tushare as tsimport pandas as pdfrom sqlalchemy import create_engineimport scheduleimport timefrom datetime import datetime, timedeltaimport akshare as ak# 初始化Tushare Prots.set_token('XXXXX')  # 替换为实际Tokenpro = ts.pro_api()# 数据库配置 engine = create_engine('mysql://root:12345678@localhost/db_stock?charset=utf8')def get_trade_dates(start_date, end_date):    """    使用AkShare获取指定日期范围内的交易日历    参数格式:YYYYMMDD (如:20200101)    """    # 获取完整的交易日历(为啥不用tushare,积分不够)    trade_date_df = ak.tool_trade_date_hist_sina()    # 格式转换:将日期列转换为YYYYMMDD格式    trade_date_df['trade_date'] = pd.to_datetime(trade_date_df['trade_date']).dt.strftime('%Y%m%d')    # 过滤指定日期范围    mask = (trade_date_df['trade_date'] >= start_date) & (trade_date_df['trade_date'] <= end_date)    trade_dates = trade_date_df.loc[mask, 'trade_date'].tolist()    return trade_datesdef init_historical_data():    """    初始化历史数据(20200102 - 20250719)    按交易日获取全市场数据,避免个股循环    """    # 获取交易日历    trade_dates = get_trade_dates('20200101''20250719')    print(f"📅 共获取 {len(trade_dates)} 个交易日数据")    for i, date in enumerate(trade_dates):        # 频率控制:每分钟不超过500次        if i % 400 == 0 and i > 0:            print(f"⏳ 已完成 {i} 个交易日,暂停60秒避免超频...")            time.sleep(60)        try:            # 单次获取当日全市场数据(约4800条)[6](@ref)            df = pro.daily(trade_date=date)            # 数据存储            df.to_sql('stock_daily', engine, if_exists='append', index=False)            print(f"✅ {date} 写入 {len(df)} 条")        except Exception as e:            print(f"❌ {date} 获取失败: {e}")            # 失败重试逻辑可扩展def daily_update():    """每日16:00执行的更新任务"""    today = datetime.now().strftime("%Y%m%d")    # 检查今日数据是否存在    if pd.read_sql(f"SELECT 1 FROM stock_daily WHERE trade_date='{today}'", engine).empty:        try:            df = pro.daily(trade_date=today)            if not df.empty:                df.to_sql('stock_daily'


    
, engine, if_exists='append', index=False)                print(f"🔄 更新{today}数据: {len(df)}条")            else:                print(f"⚠️ 今日{today}无交易数据(可能为节假日)")        except Exception as e:            print(f"🔥 更新失败: {e}")    else:        print(f"⏩ {today}数据已存在,跳过更新")# 主程序逻辑if __name__ == '__main__':    print("=" * 50)    print("📊 股票数据初始化系统启动")    print("=" * 50)    # 首次初始化历史数据    start_time = time.time()    init_historical_data()    duration = time.time() - start_time    print(f"⏱️ 历史数据初始化完成,耗时: {duration // 60:.0f}{duration % 60:.0f}秒")    # 设置每日定时任务    schedule.every().day.at("16:00").do(daily_update)    print("\n🚀 定时任务守护进程已启动 (每日16:00执行)")    print("=" * 50)    while True:        schedule.run_pending()        time.sleep(60)


如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。

如果对每日复盘感兴趣,欢迎关注我另外一个号。重在市场理解(大盘情绪、 热门概念板块、 热门个股), 技术只是辅助。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/184726
 
9 次点击