社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  MQ

python中的rabbitmq和rpc

Mad Wombat • 4 年前 • 719 次点击  

我正在使用amqp-rpc(或多或少,正如rabbitmq文档中所描述的那样)根据中央身份验证服务对我的服务进行身份验证

我遇到的问题是,每个身份验证请求都创建自己的独占队列。所以,如果有几百个请求进入,我会突然有几百个队列把rabbitmq管理接口弄乱。因此我认为,由于每个wsgi工作进程一次只处理一个请求,所以我可以使用pid和线程标识生成队列名称,因此每个工作进程/线程只有一个队列。我拥有的django/drf应用程序的身份验证代码如下

class AMQPAuthentication(authentication.BaseAuthentication):
    channel = None
    connection = None

    def __init__(self):
        if not AMQPAuthentication.connection or not AMQPAuthentication.connection.is_open:
            if self.connection is not None:
                logger.error('AMQP connection closed. Reconnecting.')
            self._reconnect()
        self.connection = AMQPAuthentication.connection
        self.channel = self.connection.channel()
        queue_name = "WSGI-PID-{}-thread-{}".format(os.getpid(), threading.get_ident())
        try:
            self.queue = self.channel.queue_declare(queue=queue_name, exclusive=True, auto_delete=True, durable=False)
            self.callback = self.queue.method.queue
            self.channel.basic_consume(self._on_response, no_ack=True, queue=self.callback)
        except pika.exceptions.ConnectionClosed:
            self._reconnect()
            self.connection = AMQPAuthentication.connection
            self.channel = self.connection.channel()
            self.queue = self.channel.queue_declare(queue=queue_name, exclusive=True, auto_delete=True, durable=False)
            self.callback = self.queue.method.queue
            self.channel.basic_consume(self._on_response, no_ack=True, queue=self.callback)

    def _reconnect(self):
        creds = pika.credentials.PlainCredentials(username=settings.AMQP_USER, password=settings.AMQP_PASS)
        AMQPAuthentication.connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.AMQP_HOST, credentials=creds))
        AMQPAuthentication.channel = AMQPAuthentication.connection.channel()
        logger.info('connected to AMQP broker')

    def _on_response(self, ch, method, props, body):
        if self.msg_id == props.correlation_id:
            self.response = body
            self.channel.close()

    def authenticate(self, request):
        if request.META.get('HTTP_AUTHORIZATION'):
            header = request.META.get('HTTP_AUTHORIZATION')
            self.response = None
            self.msg_id = str(uuid.uuid4())
            props = pika.BasicProperties(reply_to=self.callback, correlation_id=self.msg_id)
            self.channel.basic_publish(exchange='', routing_key=settings.AUTH_QUEUE_NAME, properties=props, body=str(header))

            def authentication_timeout():
                raise exceptions.AuthenticationFailed("Authentication timed out.")
            # add timeout for response from the auth service hard-code 15 second timeout for now
            tid = self.connection.add_timeout(15, authentication_timeout)
            while self.response is None:
                self.connection.process_data_events()
            # once we have a response, cancel the timeout
            self.connection.remove_timeout(tid)
            try:
                data = json.loads(self.response)
                if 'id' in data:
                    user = User.objects.get(pk=int(data['id']))
                else:
                    raise exceptions.AuthenticationFailed(json.dumps({"error": "Authentication failed"}))
            except Exception as e:
                raise exceptions.AuthenticationFailed(e)
            return (user, api_key)

但由于某种原因,每当我这样做时,对同一个worker的第一个请求就可以进行身份验证,但随后的每个请求都会超时。我试图使队列非广告和非独占,我尝试添加消息确认,上面的代码版本在每次身份验证完成时关闭通道,但仍然不起作用。我做错什么了?似乎我应该能够一遍又一遍地绑定到同一个队列并从中获取身份验证消息,但是出于某种原因,只有第一个请求获得回调消息。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/47898
 
719 次点击