Py学习  »  Python

通达信全推数据完美实现:Python编程自动化

贝叶斯技术科技 • 昨天 • 15 次点击  

书接上回,通达信本身并不带全推数据,这导致每次我们要获取5分钟数据都要重新点击下载,否则的话无法快速获得数据,这样对于我们日常使用中造成了不少麻烦。

那么本文就是为了解决通达信全推的关键痛点而写的。

下面就通过5分钟数据来做个详细的讲解,这是一个通过Python数据库获得数据方法,并且增量追加本地.lc5的历史数据。

首先、我们要指定获取的数据例如

sh510050.lc5sh510300.lc5sh510500.lc5sz300059.lc5

这部分可以自定义,注意区分市场上海是sh,深圳是sz

然后我们通过遍历contract_codes,解析市场标志(上海市场=1,深圳市场=0)和原始代码(去掉前缀)。

调用api.get_security_bars(0,market,raw_code,0,240):

0表示5分钟K线周期(通达信周期代码:0-5分钟,1-15分钟,2-30分钟,3-60分钟,4-日线,5-周线,6-月线等)。

240表示请求最近240根5分钟K线(约5个交易日的分钟数据)。

对返回数据进行有效性过滤,存入字典all_latest_5m_data[code]=valid_bars。

每次请求后time.sleep(0.05)避免请求过快被限制。

其次、我们要确定自己文件的目录路径

这部分也是自定义的

上海:C:/tdx/vipdoc/sh/fzline/{code}.lc5深圳:C:/tdx/vipdoc/sz/fzline/{code}.lc5

需要找到自己电脑的对应文件位置

然后读取本地最后一条记录

本地 .lc5 文件每条记录固定32字节,结构(小端序):

struct.unpack(', data)字段	类型	说明date_val	unsigned short	日期编码: (year-2004)*2048 + month*100 + daytime_val	unsigned short	时间编码: hour*60 + minuteopen_p	float	开盘价high_p	float	最高价low_p	float	最低价close_p	float	收盘价amount_val	float	成交额volume_val	unsigned int	成交量(手)reserved	unsigned int	保留字段(写入0

读取最后一条记录,解码得到最后K线的时间 (year,month,day,hour,minute),作为增量查找的基准。

用Python取数后自动后台刷新

寻找增量起始索引

在从服务器获取的 minute_list(已按时间升序)中,从后往前查找与本地最后记录时间匹配的索引 start_idx。

若找不到或本地文件为空,start_idx = 0,即全量重写(但不会删除旧数据,只是从第一条追加,可能重复)。

追加新记录

从 start_idx 开始遍历 minute_list,对每条有效K线:

编码 date_val 和 time_val。

按顺序写入32字节结构(成交额清理,成交量取整,保留字段填0)。

每写入一条打印一次更新时间(生产环境可关闭打印)。

最后 f.flush() 确保数据落盘。

完整Python代码

from pytdx.hq import TdxHq_APIimport structimport timeimport osfrom datetime import datetime# ------------------------------# 合约代码列表(根据提供的 .lc5 文件名生成)# ------------------------------# 给定的文件名列表(多行字符串)lc5_files = """sh510050.lc5sh510300.lc5sh510500.lc5sz300059.lc5"""contract_codes = [line.strip().replace('.lc5'''for line in lc5_files.split() if line.strip()]print(f"总合约数量: {len(contract_codes)}")TDX_SERVERS = [    {'ip''183.201.231.84''port'7709}]def connect_to_tdx():    """尝试连接到可用的通达信服务器"""    for server in TDX_SERVERS:        try:            api = TdxHq_API()            print(f"尝试连接服务器 {server['ip']}:{server['port']}...")            if api.connect(server['ip'], server['port'], time_out=5):                print(f"成功连接到 {server['ip']}:{server['port']}")                return api            else:                api.disconnect()        except Exception as e:            print(f"连接 {server['ip']}:{server['port']} 失败: {e}")            continue    return Nonedef is_valid_kline(k):    """检查单条数据是否有效"""    if not isinstance(k, dict):        return False    try:        year = k.get('year'0)        month = k.get('month'0)        day = k.get('day'0)        hour = k.get('hour'0)        minute = k.get('minute'0)        if year 2004 or year > 2040return False        if month 1 or month > 12

return False        if day 1 or day > 31return False        if hour 0 or hour > 23return False        if minute 0 or minute > 59return False        if k.get('open'0) <= 0return False        if k.get('high'0) <= 0return False        if k.get('low'0) <= 0return False        if k.get('close'0) <= 0return False        return True    except:        return Falsedef sanitize_amount(amount):    """清理成交额,防止溢出"""    try:        val = float(amount)        if val > 1e18 or val 0 or val != val:            return 0.0        return val    except:        return 0.0def GetLatest5MData(contract_codes):    api = connect_to_tdx()    if api is None:        print("无法连接到任何通达信服务器")        return {}        all_latest_5m_data = {}    total = len(contract_codes)        try:        for idx, code in enumerate(contract_codes, 1):            # 判断市场:sh -> 1(上海), sz -> 0(深圳)            if code.startswith('sh'):                market = 1                raw_code = code[2:]   # 去掉 'sh'            elif code.startswith('sz'):                market = 0                raw_code = code[2:]   # 去掉 'sz'            else:                print(f"未知市场前缀: {code}")                continue            try:                # category=0 表示5分钟K线                bars = api.get_security_bars(0, market, raw_code, 0240)                if bars:                    valid_bars = [b for b in bars if is_valid_kline(b)]                    print(f"[{idx}/{total}] 获取合约 {code} 数据成功,原始 {len(bars)} 条,有效 {len(valid_bars)} 条")                    all_latest_5m_data[code] = valid_bars                else:                    print(f"[{idx}/{total}] 获取合约 {code} 数据为空")                    all_latest_5m_data[code] = []                time.sleep(0.05)            except Exception as e:                print(f"获取合约 {code}


    
 数据失败: {e}")                all_latest_5m_data[code] = []    finally:        api.disconnect()            return all_latest_5m_datadef Append5MData(contract_codes):    record_size = 32        print("\n正在获取最新5分钟数据...")    latest_all = GetLatest5MData(contract_codes)        if not latest_all:        print("没有获取到任何数据")        return    for code in contract_codes:        # 根据前缀确定文件存放目录        if code.startswith('sh'):            base_dir = "C:/tdx/vipdoc/sh/fzline"        elif code.startswith('sz'):            base_dir = "C:/tdx/vipdoc/sz/fzline"        else:            print(f"跳过未知市场代码: {code}")            continue                file_path = os.path.join(base_dir, f"{code}.lc5")        os.makedirs(os.path.dirname(file_path), exist_ok=True)        minute_list = latest_all.get(code, [])        if not minute_list:            print(f"合约 {code} 没有有效数据,跳过")            continue        # 打开文件进行读写        with open(file_path, 'a+b'as f:            # 获取文件大小            f.seek(0, os.SEEK_END)            file_size = f.tell()            if file_size < record_size:                last_key = None                print(f"合约 {code} 文件为空或损坏,将从头开始写入")            else:                # 读取最后一条记录                f.seek(-record_size, os.SEEK_END)                data = f.read(record_size)                try:                    (date_val, time_val, open_p, high_p, low_p, close_p,                     amount_val, volume_val, reserved) = struct.unpack(', data)                    year = date_val // 2048 + 2004                    month = (date_val % 2048) // 100                    day = (date_val % 2048) % 100                    hour = time_val // 60                    minute = time_val % 60                    last_key = (year, month, day, hour, minute)                    print(f"合约 {code} 最后一条记录时间: {year:04d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}")                except:                    last_key = None                    print(f"合约 {code} 读取最后记录失败,将从头开始写入")            # 寻找增量起始索引            if last_key is None:                start_idx = 0            else:                start_idx = None                for idx in range(len(minute_list)-1, -1, -1):                    m = minute_list[idx]                    key = (m['year'], m['month'], m['day'], m['hour'], m['minute'])                    if key == last_key:                        start_idx = idx + 1                        break                if start_idx is None:


    
                    start_idx = 0                    print(f"合约 {code} 未找到匹配记录,将从头开始写入")            # 写入新增的K线            new_records_count = 0            for i in range(start_idx, len(minute_list)):                m = minute_list[i]                if not is_valid_kline(m):                    print(f"跳过无效K线: {m}")                    continue                print(f"更新合约 {code}{m['year']}-{m['month']:02d}-{m['day']:02d} {m['hour']:02d}:{m['minute']:02d}")                encode_date = (m['year'] - 2004) * 2048 + m['month'] * 100 + m['day']                encode_time = m['hour'] * 60 + m['minute']                amount_clean = sanitize_amount(m.get('amount'0))                f.write(struct.pack(', encode_date))                f.write(struct.pack(', encode_time))                f.write(struct.pack(', m['open']))                f.write(struct.pack(', m['high']))                f.write(struct.pack(', m['low']))                f.write(struct.pack(', m['close']))                f.write(struct.pack(', amount_clean))                f.write(struct.pack('int(m.get('volume'0))))                f.write(struct.pack('0))   # 保留                new_records_count += 1            if new_records_count > 0:                print(f"合约 {code} 新增 {new_records_count} 条记录")            else:                print(f"合约 {code} 无新增有效数据")                            f.flush()if __name__ == "__main__":    try:        print("开始运行5分钟数据更新程序(支持上海+深圳)...")        print(f"共需更新 {len(contract_codes)} 个合约")        print(f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")                Append5MData(contract_codes)    except KeyboardInterrupt:        print("\n用户按 Ctrl+C 退出程序")    except Exception as e:        print(f"运行失败。异常信息: {e}")        import traceback        traceback.print_exc()

通达信编程实战,本文仅作为通达信公式编程语言(TDX)的学习案例分享。内容涉及数据处理逻辑与条件判断代码的编写技巧,旨在帮助编程爱好者理解多条件嵌套算法。市场有风险,代码需谨慎。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/196641