社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

【Python技术】量化之miniqmt根据分时均线买入卖出示例

子晓聊技术 • 3 月前 • 173 次点击  
前2天写了一篇文章介绍获取分时均线的例子, 星球群在讨论如何根据短线分时均线做决策。
对于我个人做短线交易来说, 分时均线买入卖出大概分为几种情况。我自己写了一个例子,并不一定对
1、买入条件1:价格回落未破分时均线 (当前价>VWAP且回落≤0.5%)
2、买入条件2:分时均线下方偏离超过X%
3、卖出条件1:跌破分时均线止损
4、卖出条件2:分时价格超过均线X%止盈
5、卖出条件3:超短线持仓超过3天

提供的代码只是示例,存在一些瑕疵。比如持仓如果已有个股或已提交委托,正常不应该继续买入。    
改造思路,你可以通过通达信自选股或预警得到对应的股票池,然后对对应的股票池进行监控。
写的内容偏短线,对于中长线价值投资没啥帮助。 其实买入卖出更精准的方式还是通过L2 接口数据来做分析。

相关文章推荐:
【Python技术】miniqmt获取分时均线简单例子
量化交易之miniqmt入门指南新手篇
利用问财、miniqmt打造一款属于自己的量化交易系统
【Python技术】miniqmt交易常用操作封装类
[Python技术] 量化之miniqmt下载A股数据到本地

写个段子乐呵乐呵。

09:31:00 金叉触发!梭哈买入!  
09:32:15 死叉降临!割肉跑路!  
09:33:30 触发金叉!梭哈买入!  
09:34:45 死叉降临!割肉跑路!  
...(循环10次后)...  
14:55:00 策略崩溃:Error 429 - 您的操作过于频繁,已被券商服务器拉黑



结局

  • 账户余额:-50%(手续费贡献冠军)
  • 券商客户经理来电:"兄弟,你比高频策略还猛,来我们机房当服务器压力测试员吧!"
  • 小明望着分时均线感慨:"原来这线是韭菜收割机自带的进度条..."


这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。
import timeimport mathimport threadingimport pandas as pdfrom datetime import datetime, time as dt_timefrom xtquant import xtdata, xtconstantfrom xtquant.xttype import StockAccountfrom xtquant.xttrader import XtQuantTrader, XtQuantTraderCallbackclass VWAPStrategy:    def __init__(self, stock_code, capital_per_order=10000):        self.stock_code = stock_code        self.capital_per_order = capital_per_order        self.position = 0        self.vwap = 0        self.last_trade_time = None        self.position_entry_price = 0  # 持仓成本价        self.position_entry_time = None  # 持仓时间    def fetch_stock_data(self):        """获取实时数据并计算分时均线(VWAP)"""        tick_data = xtdata.get_full_tick([self.stock_code])        if self.stock_code in tick_data:            stock_tick = tick_data[self.stock_code]            total_volume = stock_tick['volume']  # 总成交量(手)            total_amount = stock_tick['amount']  # 总成交金额(元)            current_price = stock_tick['lastPrice']  # 最新价            # 计算分时均线(VWAP)            self.vwap = total_amount / (total_volume * 100if total_volume > 0 else current_price            return {                'price': current_price,                'vwap': self.vwap,                'volume': total_volume,                'amount': total_amount,                'time': datetime.now().strftime("%H:%M:%S")            }        return None    def generate_signal(self, data):        """生成交易信号"""        current_price = data['price']        vwap = data['vwap']        deviation = (current_price - vwap) / vwap * 100  # 计算偏离度        # 买入条件1:价格回落未破分时均线 (当前价>VWAP且回落≤0.5%)        buy_condition1 = (current_price > vwap and                          (vwap - current_price) / vwap <= 0.005)        # 买入条件2:分时均线下方偏离超过5%        buy_condition2 = deviation < -5        # 卖出条件1:跌破分时均线止损        sell_condition1 = (self.position > 0 and                           current_price < vwap)        # 卖出条件2:分时价格超过均线5%止盈        sell_condition2 = (self.position > 0 and                           deviation > 5)        


    
# 卖出条件3:超短线持仓超过3天        sell_condition3 = (self.position > 0 and                           self.position_entry_time and                           (datetime.now() - self.position_entry_time).days >= 3)        if buy_condition1:            return 'BUY'f"回落未破均线({deviation:.2f}%)"        elif buy_condition2:            return 'BUY'f"均线下偏离({deviation:.2f}%)"        elif sell_condition1:            return 'SELL'f"跌破均线止损({deviation:.2f}%)"        elif sell_condition2:            return 'SELL'f"偏离止盈({deviation:.2f}%)"        elif sell_condition3:            return 'SELL'"超时持仓(≥3天)"        return 'HOLD'""    def execute_trade(self, xt_trader, account, signal, reason, current_price):        """执行交易指令"""        # 30秒内禁止重复交易        if self.last_trade_time and (datetime.now() - self.last_trade_time).seconds 30:            return False        # 计算交易数量 (按固定资金计算)        volume = math.floor(self.capital_per_order / current_price / 100) * 100  # 整手交易        if signal == 'BUY' and volume > 0:            # 调用MiniQMT买入接口            order_id = xt_trader.order_stock(                account, self.stock_code,                xtconstant.STOCK_BUY, volume,                xtconstant.LATEST_PRICE, 0,                f"策略买入:{reason}"            )            if order_id:                self.position += volume                self.position_entry_price = current_price                self.position_entry_time = datetime.now()                print(f"[{datetime.now().strftime('%H:%M:%S')}] 买入信号触发: {reason}")                return True        elif signal == 'SELL' and self.position > 0:            sell_volume = min(volume, self.position)            # 调用MiniQMT卖出接口            order_id = xt_trader.order_stock(                account, self.stock_code,                xtconstant.STOCK_SELL, sell_volume,                xtconstant.LATEST_PRICE, 0,                f"策略卖出:{reason}"            )            if order_id:                self.position = max(0, self.position - sell_volume)                if self.position == 0:                    self.position_entry_price = 0                    self.position_entry_time = None                print(f"[{datetime.now().strftime('%H:%M:%S')}] 卖出信号触发: {reason}")


    
                return True        self.last_trade_time = datetime.now()        return Falseclass MyXtQuantTraderCallback(XtQuantTraderCallback):    """交易回调处理类"""    def on_disconnected(self):        print(f"[{datetime.now().strftime('%H:%M:%S')}] 连接断开")    def on_stock_order(self, order):        print(f"[{datetime.now().strftime('%H:%M:%S')}] 委托更新: {order.stock_code} "              f"状态={self._get_order_status(order.order_status)}")    def on_stock_trade(self, trade):        print(f"[{datetime.now().strftime('%H:%M:%S')}] 成交: {trade.stock_code} "              f"{trade.traded_volume}股@{trade.traded_price:.2f}")    def on_order_error(self, order_error):        print(f"[{datetime.now().strftime('%H:%M:%S')}] 委托错误: {order_error.error_msg}")    @staticmethod    def _get_order_status(status):        """转换订单状态码为可读文本"""        status_map = {            48'未报'49'待报'50'已报',            51'已报待撤'52'部分待撤'53'部撤',            54'已撤'55'部成'56'已成'57'废单'        }        return status_map.get(status, '未知状态')def is_trading_time():    """判断是否在交易时段"""    now = datetime.now()    return (dt_time(930) <= now.time() <= dt_time(150)) and now.weekday() 5def main():    # ========== 配置参数 ==========    QMT_PATH = r"D:/国金QMT交易端模拟/userdata_mini"  # 替换为你的MiniQMT路径    ACCOUNT_ID = "XXXXXXX"  # 替换为你的资金账号    STOCK_CODE = "XXXXXX.SZ"  # 监控的股票代码    CAPITAL_PER_ORDER = 20000  # 单笔交易金额(元)    # ========== 初始化交易接口 ==========    session_id = int(time.time())  # 唯一会话ID    xt_trader = XtQuantTrader(QMT_PATH, session_id)    account = StockAccount(ACCOUNT_ID, 'STOCK')    # 创建并注册回调    callback = MyXtQuantTraderCallback()    xt_trader.register_callback(callback)    # 启动交易线程    xt_trader.start()    # 建立交易连接    connect_result = xt_trader.connect()    if connect_result != 0:        print(f"交易连接失败,错误代码: {connect_result}


    
")        return    # 订阅账户    subscribe_result = xt_trader.subscribe(account)    if subscribe_result != 0:        print(f"账户订阅失败,错误代码: {subscribe_result}")        return    print("=" * 50)    print(f"启动分时均线策略监控: {STOCK_CODE}")    print(f"交易账号: {ACCOUNT_ID} | 单笔金额: {CAPITAL_PER_ORDER}元")    print("=" * 50)    # ========== 初始化策略 ==========    strategy = VWAPStrategy(STOCK_CODE, CAPITAL_PER_ORDER)    # ========== 主循环 ==========    try:        while True:            if not is_trading_time():                # 非交易时段每30秒检查一次                print(f"\r[{datetime.now().strftime('%H:%M:%S')}] 非交易时段,等待开盘...", end="")                time.sleep(30)                continue            # 获取实时数据            data = strategy.fetch_stock_data()            if not data:                time.sleep(3)                continue            # 生成交易信号            signal, reason = strategy.generate_signal(data)            # 输出监控信息            status_line = (f"[{data['time']}] 价格:{data['price']:.2f} | "                           f"均线:{data['vwap']:.2f} | "                           f"偏离:{((data['price'] - data['vwap']) / data['vwap'] * 100):.2f}% | "                           f"持仓:{strategy.position}股")            if signal != 'HOLD':                print(f"{status_line} → 信号: {signal}({reason})")                strategy.execute_trade(xt_trader, account, signal, reason, data['price'])            else:                print(status_line)            time.sleep(3)  # 3秒刷新    except KeyboardInterrupt:        print("\n策略终止")    finally:        xt_trader.stop()if __name__ == "__main__":    main()

如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。

如果对每日复盘感兴趣,欢迎关注我另外一个号。重在市场理解(大盘情绪、 热门概念板块、 热门个股), 技术只是辅助。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/185118