Py学习  »  MQ

使用Flask Server时RabbitMQ崩溃的PIKA

Miguel • 5 年前 • 1497 次点击  

所以我们有一个运行的单线程flask服务器,在那里我们接收来自Python应用程序客户机的请求。在这个flask服务器中,我们使用rabbitmq和pika库将消息分发给其他客户机。 发生的情况是,在get函数中,程序因错误而崩溃:

pika.exceptions.connectionClosed:(505,'意外的帧-应为' 类60的内容头,改为获取非内容头框架')

我在堆栈溢出和其他方面搜索了很多关于这个的主题,但它们都解决了多线程的问题,而事实并非如此。烧瓶只能与一个线程一起使用,除非在app.run中调用它(threaded=yes)。

当短时间内发送多条消息(例如每秒5条消息)时,程序通常会崩溃,同时需要注意的是,每秒钟都会收到一条消息,并请求此功能:

@app.route('/api/users/getMessages', methods=['POST'])  
def get_Messages():  
    data = json.loads(request.data)
    token = data['token']

    payload = jwt.decode(token, 'SECRET', algorithms=['HS256'])
    istid = payload['istid']
    print('istid: '+istid)

    messages = []

    queue = channel.queue_declare(queue=istid)
    for i in range(queue.method.message_count):
        method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True)
        if method_frame:
            #print(method_frame, header_frame, body)
            messages.append(body)
        else:
            print('No message returned')

    res = {'messages':messages, 'error':0}
    return jsonify(res)

在该代码中,它通常在行中崩溃:

queue = channel.queue_declare(queue=istid)

但我们也尝试将代码更改为使用一段时间,而不是用于当主体为空且在行中崩溃时结束的位置:
method_frame, header_frame, body = channel.basic_get(queue=istid, no_ack=True) 那样的话。 同样重要的是,崩溃是随机的,它可以工作几次,然后在发送消息时在GET请求后随机崩溃。如果有人知道与此有关的任何事情,我们将感谢任何帮助。

另一个注意事项是,我们考虑将基本的\u消费与回调一起使用,而不是使用基本的\u获取,但是我们没有找到一种方法来实现这一点,因为我们必须将消息发送回来,并让几个用户向同一功能发出请求。

编辑第1页: 在rabbitmq文档中 rabbitmq 如果搜索“def basic_get”函数,您会注意到有一些todo注释,还引用了该注释。

由于实现的详细信息,无法再次调用 直到执行回调。

所以我怀疑这可能是发生的事情,但即使是这样,我也不知道如何解决。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/30889
 
1497 次点击  
文章 [ 1 ]  |  最新文章 5 年前
Miguel
Reply   •   1 楼
Miguel    5 年前

对于任何对该解决方案感兴趣的人,正如在其他注释中一样,该程序不是线程安全的,因为1.0版的flask使用threaded=true作为默认值。
解决方案是:
1)使用app.run运行烧瓶(螺纹=假)
2)通过在访问PIKA的通道/连接时实现锁,使程序线程安全。