Py学习  »  MQ

symfony messenger/rabbitmq中的使用者错误处理

thomaskonrad • 4 年前 • 662 次点击  

我用的是新的 Symfony Messenger Component 4.1和 RabbitMQ 3.6.10-1从我的symfony 4.1 Web应用程序排队并异步发送电子邮件和短信通知。我的Messenger配置( messenger.yaml )如下所示:

framework:
    messenger:
        transports:
            amqp: '%env(MESSENGER_TRANSPORT_DSN_NOTIFICATIONS)%'

        routing:
            'App\NotificationBundle\Entity\NotificationQueueEntry': amqp

当要发送新通知时,我会这样排队:

use Symfony\Component\Messenger\MessageBusInterface;
// ...
$notificationQueueEntry = new NotificationQueueEntry();
// [Set notification details such as recipients, subject, and message]
$this->messageBus->dispatch($notificationQueueEntry);

然后我在命令行上这样启动消费者:

$ bin/console messenger:consume-messages

我已经实现了 SendNotificationHandler 实际交付的服务。服务配置:

App\NotificationBundle\MessageHandler\SendNotificationHandler:
    arguments:
        - '@App\NotificationBundle\Service\NotificationQueueService'
    tags: [ messenger.message_handler ]

课程:

class SendNotificationHandler
{
    public function __invoke(NotificationQueueEntry $entry): void
    {
        $this->notificationQueueService->sendNotification($entry);
    }
}

在这之前,所有的工作都很顺利,通知也会送达。

现在我的问题 :由于(临时)网络故障,可能无法发送电子邮件或短信息。在这种情况下,我希望系统在指定的时间段后重试传递,最多可重试指定的最大重试次数。 实现这一目标的方法是什么?

我读过 Dead Letter Exchanges 但是,我找不到任何关于如何将其与symfony messenger组件集成的文档或示例。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/38513
 
662 次点击  
文章 [ 1 ]  |  最新文章 4 年前
Nek
Reply   •   1 楼
Nek    5 年前

您需要做的是告诉rabbitmq,消息被拒绝而不是被确认。默认情况下,通讯器将在 AmqpReceiver . 如您所见,如果抛出实现 RejectMessageExceptionInterface 在处理程序内部,消息将自动被拒绝。

您还可以使用定制中间件“模拟”这种行为。我在一个小的演示应用程序中创建了类似的东西。该机制由一个中间件组成,该中间件将(序列化)原始消息包装在新的 RetryMessage 并通过自定义消息总线将其发送到另一个队列,用作死信交换。然后,该消息的处理程序将解包retrymessage(获取原始消息并对其进行反序列化),并通过默认总线进行传输:

见:

这是一个基本设置,它拒绝消息并允许您立即再次使用它!!)。您可能希望在延迟消耗以改进这一点时添加额外信息,例如时间戳的标题。为此,您应该考虑编写自己的接收器、中间件和/或处理程序。