我试图在我的应用程序中应用一些基于rabbitmq的流控制。
关于我的系统的一个非常狭隘的简介:
-
有一些BlueWorker扫描和输入目录,并将消息发布到Exchange。
-
还有其他的red worker使用这个交换(基于路由密钥)并对数据执行任何操作,而不是删除它。
在交换中“存储”的数据相当大,工作人员查看这些数据所需的时间是显而易见的。一段时间后,我从rabbitmq收到一个内存警告,指出内存使用率过高,所有发布操作都已停止。
我试图增加rabbitmq正在使用的内存量,但它只是将问题推迟了几个小时(运行时)。我也使队列基于磁盘而不是基于ram,但我的磁盘却已满。
因为我的输入不是那么大,所以我可以使用一个“大”的输入队列,蓝色的工作人员阅读他们的输入。所以我想试着在蓝工和交易所之间的联系上设置一些“最大长度”。我相信我不会在这里失去任何东西,因为我的系统的真正瓶颈是红色工人(顺便说一句,我声明了红色工人和交易所之间的联系
prefech_count=2
)
说了这么多…我没能做到这么长:(
我正在使用
Pika
声明我的队列并使用通道。
我读过这个(
https://www.rabbitmq.com/maxlength.html
)但是没有在我的代码中实现它,我希望看到一个使用这个最大大小标志的示例。