社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  MQ

symfony messenger/rabbitmq中的使用者错误处理

thomaskonrad • 5 年前 • 815 次点击  

我用的是新的 Symfony Messenger Component 4.1和 RabbitMQ 3.6.10-1从我的symfony 4.1 Web应用程序排队并异步发送电子邮件和短信通知。我的Messenger配置( messenger.yaml )如下所示:

framework:
    messenger:
        transports:
            amqp: '%env(MESSENGER_TRANSPORT_DSN_NOTIFICATIONS)%'

        routing:
            'App\NotificationBundle\Entity\NotificationQueueEntry': amqp

当要发送新通知时,我会这样排队:

use Symfony\Component\Messenger\MessageBusInterface;
// ...
$notificationQueueEntry = new NotificationQueueEntry();
// [Set notification details such as recipients, subject, and message]
$this->messageBus->dispatch($notificationQueueEntry);

然后我在命令行上这样启动消费者:

$ bin/console messenger:consume-messages

我已经实现了 SendNotificationHandler 实际交付的服务。服务配置:

App\NotificationBundle\MessageHandler\SendNotificationHandler:
    arguments:
        - '@App\NotificationBundle\Service\NotificationQueueService'
    tags: [ messenger.message_handler ]

课程:

class SendNotificationHandler
{
    public function __invoke(NotificationQueueEntry $entry): void
    {
        $this->notificationQueueService->sendNotification($entry);
    }
}

在这之前,所有的工作都很顺利,通知也会送达。

现在我的问题 :由于(临时)网络故障,可能无法发送电子邮件或短信息。在这种情况下,我希望系统在指定的时间段后重试传递,最多可重试指定的最大重试次数。 实现这一目标的方法是什么?

我读过 Dead Letter Exchanges 但是,我找不到任何关于如何将其与symfony messenger组件集成的文档或示例。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/38513
 
815 次点击  
文章 [ 1 ]  |  最新文章 5 年前
Nek
Reply   •   1 楼
Nek    6 年前

您需要做的是告诉rabbitmq,消息被拒绝而不是被确认。默认情况下,通讯器将在 AmqpReceiver . 如您所见,如果抛出实现 RejectMessageExceptionInterface 在处理程序内部,消息将自动被拒绝。

您还可以使用定制中间件“模拟”这种行为。我在一个小的演示应用程序中创建了类似的东西。该机制由一个中间件组成,该中间件将(序列化)原始消息包装在新的 RetryMessage 并通过自定义消息总线将其发送到另一个队列,用作死信交换。然后,该消息的处理程序将解包retrymessage(获取原始消息并对其进行反序列化),并通过默认总线进行传输:

见:

这是一个基本设置,它拒绝消息并允许您立即再次使用它!!)。您可能希望在延迟消耗以改进这一点时添加额外信息,例如时间戳的标题。为此,您应该考虑编写自己的接收器、中间件和/或处理程序。