Py学习  »  Python

【Python技术】量化之miniqmt根据分时均线买入卖出示例

子晓聊技术 • 4 周前 • 101 次点击  
前2天写了一篇文章介绍获取分时均线的例子, 星球群在讨论如何根据短线分时均线做决策。
对于我个人做短线交易来说, 分时均线买入卖出大概分为几种情况。我自己写了一个例子,并不一定对
1、买入条件1:价格回落未破分时均线 (当前价>VWAP且回落≤0.5%)
2、买入条件2:分时均线下方偏离超过X%
3、卖出条件1:跌破分时均线止损
4、卖出条件2:分时价格超过均线X%止盈
5、卖出条件3:超短线持仓超过3天

提供的代码只是示例,存在一些瑕疵。比如持仓如果已有个股或已提交委托,正常不应该继续买入。    
改造思路,你可以通过通达信自选股或预警得到对应的股票池,然后对对应的股票池进行监控。
写的内容偏短线,对于中长线价值投资没啥帮助。 其实买入卖出更精准的方式还是通过L2 接口数据来做分析。

相关文章推荐:
【Python技术】miniqmt获取分时均线简单例子
量化交易之miniqmt入门指南新手篇
利用问财、miniqmt打造一款属于自己的量化交易系统
【Python技术】miniqmt交易常用操作封装类
[Python技术] 量化之miniqmt下载A股数据到本地

写个段子乐呵乐呵。

09:31:00 金叉触发!梭哈买入!  
09:32:15 死叉降临!割肉跑路!  
09:33:30 触发金叉!梭哈买入!  
09:34:45 死叉降临!割肉跑路!  
...(循环10次后)...  
14:55:00 策略崩溃:Error 429 - 您的操作过于频繁,已被券商服务器拉黑



结局

  • 账户余额:-50%(手续费贡献冠军)
  • 券商客户经理来电:"兄弟,你比高频策略还猛,来我们机房当服务器压力测试员吧!"
  • 小明望着分时均线感慨:"原来这线是韭菜收割机自带的进度条..."


这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注: 如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。
import timeimport mathimport threadingimport pandas as pdfrom datetime import datetime, time as dt_timefrom xtquant import xtdata, xtconstantfrom xtquant.xttype import StockAccountfrom xtquant.xttrader import XtQuantTrader, XtQuantTraderCallbackclass VWAPStrategy:    def __init__(self, stock_code, capital_per_order=10000):        self.stock_code = stock_code        self.capital_per_order = capital_per_order        self.position = 0        self.vwap = 0        self.last_trade_time = None        self.position_entry_price = 0  # 持仓成本价        self.position_entry_time = None  # 持仓时间    def fetch_stock_data(self):        """获取实时数据并计算分时均线(VWAP)"""        tick_data = xtdata.get_full_tick([self.stock_code])        if self.stock_code in tick_data:            stock_tick = tick_data[self.stock_code]            total_volume = stock_tick['volume']  # 总成交量(手)            total_amount = stock_tick['amount']  # 总成交金额(元)            current_price = stock_tick['lastPrice']  # 最新价            # 计算分时均线(VWAP)            self.vwap = total_amount / (total_volume * 100if total_volume > 0 else current_price            return {                'price': current_price,                'vwap': self.vwap,                'volume': total_volume,                'amount': total_amount,                'time': datetime.now().strftime("%H:%M:%S")            }        return None    def generate_signal(self, data):        """生成交易信号"""        current_price = data['price']        vwap = data['vwap']        deviation = (current_price - vwap) / vwap * 100  # 计算偏离度        # 买入条件1:价格回落未破分时均线 (当前价>VWAP且回落≤0.5%)        buy_condition1 = (current_price > vwap and                          (vwap - current_price) / vwap <= 0.005)        # 买入条件2:分时均线下方偏离超过5%        buy_condition2 = deviation < -5        # 卖出条件1:跌破分时均线止损        sell_condition1 = (self.position > 0 and                           current_price < vwap)        # 卖出条件2:分时价格超过均线5%止盈        sell_condition2 = (self.position > 0 and                           deviation > 5)        # 卖出条件3:超短线持仓超过3天        sell_condition3 = (self.position > 0 and                           self.position_entry_time and                           (datetime.now() - self.position_entry_time).days >= 3)        if buy_condition1:            return 'BUY'f"回落未破均线({deviation:.2f}%)"        elif buy_condition2:            return 'BUY'f"均线下偏离({deviation:.2f}%)"        elif sell_condition1:


    
            return 'SELL'f"跌破均线止损({deviation:.2f}%)"        elif sell_condition2:            return 'SELL'f"偏离止盈({deviation:.2f}%)"        elif sell_condition3:            return 'SELL'"超时持仓(≥3天)"        return 'HOLD'""    def execute_trade(self, xt_trader, account, signal, reason, current_price):        """执行交易指令"""        # 30秒内禁止重复交易        if self.last_trade_time and (datetime.now() - self.last_trade_time).seconds 30:            return False        # 计算交易数量 (按固定资金计算)        volume = math.floor(self.capital_per_order / current_price / 100) * 100  # 整手交易        if signal == 'BUY' and volume > 0:            # 调用MiniQMT买入接口            order_id = xt_trader.order_stock(                account, self.stock_code,                xtconstant.STOCK_BUY, volume,                xtconstant.LATEST_PRICE, 0,                f"策略买入:{reason}"            )            if order_id:                self.position += volume                self.position_entry_price = current_price                self.position_entry_time = datetime.now()                print(f"[{datetime.now().strftime('%H:%M:%S')}] 买入信号触发: {reason}")                return True        elif signal == 'SELL' and self.position > 0:            sell_volume = min(volume, self.position)            # 调用MiniQMT卖出接口            order_id = xt_trader.order_stock(                account, self.stock_code,                xtconstant.STOCK_SELL, sell_volume,                xtconstant.LATEST_PRICE, 0,                f"策略卖出:{reason}"            )            if order_id:                self.position = max(0, self.position - sell_volume)                if self.position == 0:                    self.position_entry_price = 0                    self.position_entry_time = None                print(f"[{datetime.now().strftime('%H:%M:%S')}] 卖出信号触发: {reason}")                return True        self.last_trade_time = datetime.now()        return Falseclass MyXtQuantTraderCallback(XtQuantTraderCallback):    """交易回调处理类"""    def on_disconnected(self):        print(f"[{datetime.now().strftime('%H:%M:%S')}] 连接断开")    def on_stock_order(self, order):        print(f"[{datetime.now().strftime('%H:%M:%S')}] 委托更新: {order.stock_code} "              f"状态={self._get_order_status(order.order_status)}")    def


    
 on_stock_trade(self, trade):        print(f"[{datetime.now().strftime('%H:%M:%S')}] 成交: {trade.stock_code} "              f"{trade.traded_volume}股@{trade.traded_price:.2f}")    def on_order_error(self, order_error):        print(f"[{datetime.now().strftime('%H:%M:%S')}] 委托错误: {order_error.error_msg}")    @staticmethod    def _get_order_status(status):        """转换订单状态码为可读文本"""        status_map = {            48'未报'49'待报'50'已报',            51'已报待撤'52'部分待撤'53'部撤',            54'已撤'55'部成'56'已成'57'废单'        }        return status_map.get(status, '未知状态')def is_trading_time():    """判断是否在交易时段"""    now = datetime.now()    return (dt_time(930) <= now.time() <= dt_time(150)) and now.weekday() 5def main():    # ========== 配置参数 ==========    QMT_PATH = r"D:/国金QMT交易端模拟/userdata_mini"  # 替换为你的MiniQMT路径    ACCOUNT_ID = "XXXXXXX"  # 替换为你的资金账号    STOCK_CODE = "XXXXXX.SZ"  # 监控的股票代码    CAPITAL_PER_ORDER = 20000  # 单笔交易金额(元)    # ========== 初始化交易接口 ==========    session_id = int(time.time())  # 唯一会话ID    xt_trader = XtQuantTrader(QMT_PATH, session_id)    account = StockAccount(ACCOUNT_ID, 'STOCK')    # 创建并注册回调    callback = MyXtQuantTraderCallback()    xt_trader.register_callback(callback)    # 启动交易线程    xt_trader.start()    # 建立交易连接    connect_result = xt_trader.connect()    if connect_result != 0:        print(f"交易连接失败,错误代码: {connect_result}")        return    # 订阅账户    subscribe_result = xt_trader.subscribe(account)    if subscribe_result != 0:        print(f"账户订阅失败,错误代码: {subscribe_result}")        return    print("=" * 50)    print(f"启动分时均线策略监控: {STOCK_CODE}")    print(f"交易账号: {ACCOUNT_ID} | 单笔金额: {CAPITAL_PER_ORDER}元")    print("=" * 50)    # ========== 初始化策略 ==========    strategy = VWAPStrategy(STOCK_CODE, CAPITAL_PER_ORDER)    # ========== 主循环 ==========    try:        while True:            if not is_trading_time():                # 非交易时段每30秒检查一次                print(f"\r[{datetime.now().strftime(


    
'%H:%M:%S')}] 非交易时段,等待开盘...", end="")                time.sleep(30)                continue            # 获取实时数据            data = strategy.fetch_stock_data()            if not data:                time.sleep(3)                continue            # 生成交易信号            signal, reason = strategy.generate_signal(data)            # 输出监控信息            status_line = (f"[{data['time']}] 价格:{data['price']:.2f} | "                           f"均线:{data['vwap']:.2f} | "                           f"偏离:{((data['price'] - data['vwap']) / data['vwap'] * 100):.2f}% | "                           f"持仓:{strategy.position}股")            if signal != 'HOLD':                print(f"{status_line} → 信号: {signal}({reason})")                strategy.execute_trade(xt_trader, account, signal, reason, data['price'])            else:                print(status_line)            time.sleep(3)  # 3秒刷新    except KeyboardInterrupt:        print("\n策略终止")    finally:        xt_trader.stop()if __name__ == "__main__":    main()

如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。

如果对每日复盘感兴趣,欢迎关注我另外一个号。重在市场理解(大盘情绪、 热门概念板块、 热门个股), 技术只是辅助。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/185118
 
101 次点击