前2天写了一篇文章介绍获取分时均线的例子, 星球群在讨论如何根据短线分时均线做决策。对于我个人做短线交易来说, 分时均线买入卖出大概分为几种情况。我自己写了一个例子,并不一定对提供的代码只是示例,存在一些瑕疵。比如持仓如果已有个股或已提交委托,正常不应该继续买入。 改造思路,你可以通过通达信自选股或预警得到对应的股票池,然后对对应的股票池进行监控。写的内容偏短线,对于中长线价值投资没啥帮助。 其实买入卖出更精准的方式还是通过L2 接口数据来做分析。09:31:00 金叉触发!梭哈买入!
09:32:15 死叉降临!割肉跑路!
09:33:30 触发金叉!梭哈买入!
09:34:45 死叉降临!割肉跑路!
...(循环10次后)...
14:55:00 策略崩溃:Error 429 - 您的操作过于频繁,已被券商服务器拉黑
结局:
- 券商客户经理来电:"兄弟,你比高频策略还猛,来我们机房当服务器压力测试员吧!"
- 小明望着分时均线感慨:"原来这线是韭菜收割机自带的进度条..."
这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。import timeimport mathimport threadingimport pandas as pdfrom datetime import datetime, time as dt_timefrom xtquant import xtdata, xtconstantfrom xtquant.xttype import StockAccountfrom xtquant.xttrader import XtQuantTrader, XtQuantTraderCallbackclass VWAPStrategy: def __init__(self, stock_code, capital_per_order=10000): self.stock_code = stock_code self.capital_per_order = capital_per_order self.position = 0 self.vwap = 0 self.last_trade_time = None self.position_entry_price = 0 self.position_entry_time = None def fetch_stock_data(self): """获取实时数据并计算分时均线(VWAP)""" tick_data = xtdata.get_full_tick([self.stock_code]) if self.stock_code in tick_data: stock_tick = tick_data[self.stock_code] total_volume = stock_tick['volume'] total_amount = stock_tick['amount'] current_price = stock_tick['lastPrice'] self.vwap = total_amount / (total_volume * 100) if total_volume > 0 else current_price return { 'price': current_price, 'vwap': self.vwap, 'volume': total_volume, 'amount': total_amount, 'time': datetime.now().strftime("%H:%M:%S") } return None def generate_signal(self, data): """生成交易信号""" current_price = data['price'] vwap = data['vwap'] deviation = (current_price - vwap) / vwap * 100 buy_condition1 = (current_price > vwap and (vwap - current_price) / vwap <= 0.005) buy_condition2 = deviation < -5 sell_condition1 = (self.position > 0 and current_price < vwap) sell_condition2 = (self.position > 0 and deviation > 5) sell_condition3 = (self.position > 0 and self.position_entry_time and (datetime.now() - self.position_entry_time).days >= 3) if buy_condition1: return 'BUY', f"回落未破均线({deviation:.2f}%)" elif buy_condition2: return 'BUY', f"均线下偏离({deviation:.2f}%)" elif sell_condition1: return 'SELL', f"跌破均线止损({deviation:.2f}%)" elif sell_condition2: return 'SELL', f"偏离止盈({deviation:.2f}%)" elif sell_condition3: return 'SELL', "超时持仓(≥3天)" return 'HOLD', "" def execute_trade(self, xt_trader, account, signal, reason, current_price): """执行交易指令""" if self.last_trade_time and (datetime.now() - self.last_trade_time).seconds 30: return False volume = math.floor(self.capital_per_order / current_price / 100) * 100 if signal == 'BUY' and volume > 0: order_id = xt_trader.order_stock( account, self.stock_code, xtconstant.STOCK_BUY, volume, xtconstant.LATEST_PRICE, 0, f"策略买入:{reason}" ) if order_id: self.position += volume self.position_entry_price = current_price self.position_entry_time = datetime.now() print(f"[{datetime.now().strftime('%H:%M:%S')}] 买入信号触发: {reason}") return True elif signal == 'SELL' and self.position > 0: sell_volume = min(volume, self.position) order_id = xt_trader.order_stock( account, self.stock_code, xtconstant.STOCK_SELL, sell_volume, xtconstant.LATEST_PRICE, 0, f"策略卖出:{reason}" ) if order_id: self.position = max(0, self.position - sell_volume) if self.position == 0: self.position_entry_price = 0 self.position_entry_time = None print(f"[{datetime.now().strftime('%H:%M:%S')}] 卖出信号触发: {reason}")
return True self.last_trade_time = datetime.now() return Falseclass MyXtQuantTraderCallback(XtQuantTraderCallback): """交易回调处理类""" def on_disconnected(self): print(f"[{datetime.now().strftime('%H:%M:%S')}] 连接断开") def on_stock_order(self, order): print(f"[{datetime.now().strftime('%H:%M:%S')}] 委托更新: {order.stock_code} " f"状态={self._get_order_status(order.order_status)}") def on_stock_trade(self, trade): print(f"[{datetime.now().strftime('%H:%M:%S')}] 成交: {trade.stock_code} " f"{trade.traded_volume}股@{trade.traded_price:.2f}") def on_order_error(self, order_error): print(f"[{datetime.now().strftime('%H:%M:%S')}] 委托错误: {order_error.error_msg}") @staticmethod def _get_order_status(status): """转换订单状态码为可读文本""" status_map = { 48: '未报', 49: '待报', 50: '已报', 51: '已报待撤', 52: '部分待撤', 53: '部撤', 54: '已撤', 55: '部成', 56: '已成', 57: '废单' } return status_map.get(status, '未知状态')def is_trading_time(): """判断是否在交易时段""" now = datetime.now() return (dt_time(9, 30) <= now.time() <= dt_time(15, 0)) and now.weekday() 5def main(): QMT_PATH = r"D:/国金QMT交易端模拟/userdata_mini" ACCOUNT_ID = "XXXXXXX" STOCK_CODE = "XXXXXX.SZ" CAPITAL_PER_ORDER = 20000 session_id = int(time.time()) xt_trader = XtQuantTrader(QMT_PATH, session_id) account = StockAccount(ACCOUNT_ID, 'STOCK') callback = MyXtQuantTraderCallback() xt_trader.register_callback(callback) xt_trader.start() connect_result = xt_trader.connect() if connect_result != 0: print(f"交易连接失败,错误代码: {connect_result}
") return subscribe_result = xt_trader.subscribe(account) if subscribe_result != 0: print(f"账户订阅失败,错误代码: {subscribe_result}") return print("=" * 50) print(f"启动分时均线策略监控: {STOCK_CODE}") print(f"交易账号: {ACCOUNT_ID} | 单笔金额: {CAPITAL_PER_ORDER}元") print("=" * 50) strategy = VWAPStrategy(STOCK_CODE, CAPITAL_PER_ORDER) try: while True: if not is_trading_time(): print(f"\r[{datetime.now().strftime('%H:%M:%S')}] 非交易时段,等待开盘...", end="") time.sleep(30) continue data = strategy.fetch_stock_data() if not data: time.sleep(3) continue signal, reason = strategy.generate_signal(data) status_line = (f"[{data['time']}] 价格:{data['price']:.2f} | " f"均线:{data['vwap']:.2f} | " f"偏离:{((data['price'] - data['vwap']) / data['vwap'] * 100):.2f}% | " f"持仓:{strategy.position}股") if signal != 'HOLD': print(f"{status_line} → 信号: {signal}({reason})") strategy.execute_trade(xt_trader, account, signal, reason, data['price']) else: print(status_line) time.sleep(3) except KeyboardInterrupt: print("\n策略终止") finally: xt_trader.stop()if __name__ == "__main__": main()
如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。如果对每日复盘感兴趣,欢迎关注我另外一个号。重在市场理解(大盘情绪、 热门概念板块、 热门个股), 技术只是辅助。