我正在使用amqp-rpc(或多或少,正如rabbitmq文档中所描述的那样)根据中央身份验证服务对我的服务进行身份验证
我遇到的问题是,每个身份验证请求都创建自己的独占队列。所以,如果有几百个请求进入,我会突然有几百个队列把rabbitmq管理接口弄乱。因此我认为,由于每个wsgi工作进程一次只处理一个请求,所以我可以使用pid和线程标识生成队列名称,因此每个工作进程/线程只有一个队列。我拥有的django/drf应用程序的身份验证代码如下
class AMQPAuthentication(authentication.BaseAuthentication):
channel = None
connection = None
def __init__(self):
if not AMQPAuthentication.connection or not AMQPAuthentication.connection.is_open:
if self.connection is not None:
logger.error('AMQP connection closed. Reconnecting.')
self._reconnect()
self.connection = AMQPAuthentication.connection
self.channel = self.connection.channel()
queue_name = "WSGI-PID-{}-thread-{}".format(os.getpid(), threading.get_ident())
try:
self.queue = self.channel.queue_declare(queue=queue_name, exclusive=True, auto_delete=True, durable=False)
self.callback = self.queue.method.queue
self.channel.basic_consume(self._on_response, no_ack=True, queue=self.callback)
except pika.exceptions.ConnectionClosed:
self._reconnect()
self.connection = AMQPAuthentication.connection
self.channel = self.connection.channel()
self.queue = self.channel.queue_declare(queue=queue_name, exclusive=True, auto_delete=True, durable=False)
self.callback = self.queue.method.queue
self.channel.basic_consume(self._on_response, no_ack=True, queue=self.callback)
def _reconnect(self):
creds = pika.credentials.PlainCredentials(username=settings.AMQP_USER, password=settings.AMQP_PASS)
AMQPAuthentication.connection = pika.BlockingConnection(pika.ConnectionParameters(host=settings.AMQP_HOST, credentials=creds))
AMQPAuthentication.channel = AMQPAuthentication.connection.channel()
logger.info('connected to AMQP broker')
def _on_response(self, ch, method, props, body):
if self.msg_id == props.correlation_id:
self.response = body
self.channel.close()
def authenticate(self, request):
if request.META.get('HTTP_AUTHORIZATION'):
header = request.META.get('HTTP_AUTHORIZATION')
self.response = None
self.msg_id = str(uuid.uuid4())
props = pika.BasicProperties(reply_to=self.callback, correlation_id=self.msg_id)
self.channel.basic_publish(exchange='', routing_key=settings.AUTH_QUEUE_NAME, properties=props, body=str(header))
def authentication_timeout():
raise exceptions.AuthenticationFailed("Authentication timed out.")
# add timeout for response from the auth service hard-code 15 second timeout for now
tid = self.connection.add_timeout(15, authentication_timeout)
while self.response is None:
self.connection.process_data_events()
# once we have a response, cancel the timeout
self.connection.remove_timeout(tid)
try:
data = json.loads(self.response)
if 'id' in data:
user = User.objects.get(pk=int(data['id']))
else:
raise exceptions.AuthenticationFailed(json.dumps({"error": "Authentication failed"}))
except Exception as e:
raise exceptions.AuthenticationFailed(e)
return (user, api_key)
但由于某种原因,每当我这样做时,对同一个worker的第一个请求就可以进行身份验证,但随后的每个请求都会超时。我试图使队列非广告和非独占,我尝试添加消息确认,上面的代码版本在每次身份验证完成时关闭通道,但仍然不起作用。我做错什么了?似乎我应该能够一遍又一遍地绑定到同一个队列并从中获取身份验证消息,但是出于某种原因,只有第一个请求获得回调消息。