前2天写了一篇文章介绍获取分时均线的例子, 星球群在讨论如何根据短线分时均线做决策。对于我个人做短线交易来说, 分时均线买入卖出大概分为几种情况。我自己写了一个例子,并不一定对提供的代码只是示例,存在一些瑕疵。比如持仓如果已有个股或已提交委托,正常不应该继续买入。 改造思路,你可以通过通达信自选股或预警得到对应的股票池,然后对对应的股票池进行监控。写的内容偏短线,对于中长线价值投资没啥帮助。 其实买入卖出更精准的方式还是通过L2 接口数据来做分析。09:31:00 金叉触发!梭哈买入!
09:32:15 死叉降临!割肉跑路!
09:33:30 触发金叉!梭哈买入!
09:34:45 死叉降临!割肉跑路!
...(循环10次后)...
14:55:00 策略崩溃:Error 429 - 您的操作过于频繁,已被券商服务器拉黑
结局:
- 券商客户经理来电:"兄弟,你比高频策略还猛,来我们机房当服务器压力测试员吧!"
- 小明望着分时均线感慨:"原来这线是韭菜收割机自带的进度条..."
这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:
如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。import time
import math
import threading
import pandas as pd
from datetime import datetime, time as dt_time
from xtquant import xtdata, xtconstant
from xtquant.xttype import StockAccount
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
class VWAPStrategy:
def __init__(self, stock_code, capital_per_order=10000):
self.stock_code = stock_code
self.capital_per_order = capital_per_order
self.position = 0
self.vwap = 0
self.last_trade_time = None
self.position_entry_price = 0
self.position_entry_time = None
def fetch_stock_data(self):
"""获取实时数据并计算分时均线(VWAP)"""
tick_data = xtdata.get_full_tick([self.stock_code])
if self.stock_code in tick_data:
stock_tick = tick_data[self.stock_code]
total_volume = stock_tick['volume']
total_amount = stock_tick['amount']
current_price = stock_tick['lastPrice']
self.vwap = total_amount / (total_volume * 100) if total_volume > 0 else current_price
return {
'price': current_price,
'vwap': self.vwap,
'volume': total_volume,
'amount': total_amount,
'time': datetime.now().strftime("%H:%M:%S")
}
return None
def generate_signal(self, data):
"""生成交易信号"""
current_price = data['price']
vwap = data['vwap']
deviation = (current_price - vwap) / vwap * 100
buy_condition1 = (current_price > vwap and
(vwap - current_price) / vwap <= 0.005)
buy_condition2 = deviation < -5
sell_condition1 = (self.position > 0 and
current_price < vwap)
sell_condition2 = (self.position > 0 and
deviation > 5)
sell_condition3 = (self.position > 0 and
self.position_entry_time and
(datetime.now() - self.position_entry_time).days >= 3)
if buy_condition1:
return 'BUY', f"回落未破均线({deviation:.2f}%)"
elif buy_condition2:
return 'BUY', f"均线下偏离({deviation:.2f}%)"
elif sell_condition1:
return 'SELL', f"跌破均线止损({deviation:.2f}%)"
elif sell_condition2:
return 'SELL', f"偏离止盈({deviation:.2f}%)"
elif sell_condition3:
return 'SELL', "超时持仓(≥3天)"
return 'HOLD', ""
def execute_trade(self, xt_trader, account, signal, reason, current_price):
"""执行交易指令"""
if self.last_trade_time and (datetime.now() - self.last_trade_time).seconds 30:
return False
volume = math.floor(self.capital_per_order / current_price / 100) * 100
if signal == 'BUY' and volume > 0:
order_id = xt_trader.order_stock(
account, self.stock_code,
xtconstant.STOCK_BUY, volume,
xtconstant.LATEST_PRICE, 0,
f"策略买入:{reason}"
)
if order_id:
self.position += volume
self.position_entry_price = current_price
self.position_entry_time = datetime.now()
print(f"[{datetime.now().strftime('%H:%M:%S')}] 买入信号触发: {reason}")
return True
elif signal == 'SELL' and self.position > 0:
sell_volume = min(volume, self.position)
order_id = xt_trader.order_stock(
account, self.stock_code,
xtconstant.STOCK_SELL, sell_volume,
xtconstant.LATEST_PRICE, 0,
f"策略卖出:{reason}"
)
if order_id:
self.position = max(0, self.position - sell_volume)
if self.position == 0:
self.position_entry_price = 0
self.position_entry_time = None
print(f"[{datetime.now().strftime('%H:%M:%S')}] 卖出信号触发: {reason}")
return True
self.last_trade_time = datetime.now()
return False
class MyXtQuantTraderCallback(XtQuantTraderCallback):
"""交易回调处理类"""
def on_disconnected(self):
print(f"[{datetime.now().strftime('%H:%M:%S')}] 连接断开")
def on_stock_order(self, order):
print(f"[{datetime.now().strftime('%H:%M:%S')}] 委托更新: {order.stock_code} "
f"状态={self._get_order_status(order.order_status)}")
def
on_stock_trade(self, trade):
print(f"[{datetime.now().strftime('%H:%M:%S')}] 成交: {trade.stock_code} "
f"{trade.traded_volume}股@{trade.traded_price:.2f}")
def on_order_error(self, order_error):
print(f"[{datetime.now().strftime('%H:%M:%S')}] 委托错误: {order_error.error_msg}")
@staticmethod
def _get_order_status(status):
"""转换订单状态码为可读文本"""
status_map = {
48: '未报', 49: '待报', 50: '已报',
51: '已报待撤', 52: '部分待撤', 53: '部撤',
54: '已撤', 55: '部成', 56: '已成', 57: '废单'
}
return status_map.get(status, '未知状态')
def is_trading_time():
"""判断是否在交易时段"""
now = datetime.now()
return (dt_time(9, 30) <= now.time() <= dt_time(15, 0)) and now.weekday() 5
def main():
QMT_PATH = r"D:/国金QMT交易端模拟/userdata_mini"
ACCOUNT_ID = "XXXXXXX"
STOCK_CODE = "XXXXXX.SZ"
CAPITAL_PER_ORDER = 20000
session_id = int(time.time())
xt_trader = XtQuantTrader(QMT_PATH, session_id)
account = StockAccount(ACCOUNT_ID, 'STOCK')
callback = MyXtQuantTraderCallback()
xt_trader.register_callback(callback)
xt_trader.start()
connect_result = xt_trader.connect()
if connect_result != 0:
print(f"交易连接失败,错误代码: {connect_result}")
return
subscribe_result = xt_trader.subscribe(account)
if subscribe_result != 0:
print(f"账户订阅失败,错误代码: {subscribe_result}")
return
print("=" * 50)
print(f"启动分时均线策略监控: {STOCK_CODE}")
print(f"交易账号: {ACCOUNT_ID} | 单笔金额: {CAPITAL_PER_ORDER}元")
print("=" * 50)
strategy = VWAPStrategy(STOCK_CODE, CAPITAL_PER_ORDER)
try:
while True:
if not is_trading_time():
print(f"\r[{datetime.now().strftime(
'%H:%M:%S')}] 非交易时段,等待开盘...", end="")
time.sleep(30)
continue
data = strategy.fetch_stock_data()
if not data:
time.sleep(3)
continue
signal, reason = strategy.generate_signal(data)
status_line = (f"[{data['time']}] 价格:{data['price']:.2f} | "
f"均线:{data['vwap']:.2f} | "
f"偏离:{((data['price'] - data['vwap']) / data['vwap'] * 100):.2f}% | "
f"持仓:{strategy.position}股")
if signal != 'HOLD':
print(f"{status_line} → 信号: {signal}({reason})")
strategy.execute_trade(xt_trader, account, signal, reason, data['price'])
else:
print(status_line)
time.sleep(3)
except KeyboardInterrupt:
print("\n策略终止")
finally:
xt_trader.stop()
if __name__ == "__main__":
main()
如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。如果对每日复盘感兴趣,欢迎关注我另外一个号。重在市场理解(大盘情绪、 热门概念板块、 热门个股), 技术只是辅助。