Py学习  »  MQ

Spring Rabbitmq获取对扇出消息的所有答复

Ben • 6 年前 • 1766 次点击  

中的以下类包含在几个使用者应用程序中:

@Component
@Configuration
public class HealthListener {

    public static final String HEALTH_CHECK_QUEUE_NAME = "healthCheckQueue";
    public static final String HEALTH_CHECK_FANOUT_EXCHANGE_NAME = "health-check-fanout";


    @Bean
    public Binding healthListenerBinding(
            @Qualifier("healthCheckQueue") Queue queue,
            @Qualifier("instanceFanoutExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public FanoutExchange instanceFanoutExchange() {
        return new FanoutExchange(HEALTH_CHECK_FANOUT_EXCHANGE_NAME, true, false);
    }

    @Bean
    public Queue healthCheckQueue() {
        return new Queue(HEALTH_CHECK_QUEUE_NAME);
    }

    @RabbitListener(queues = HEALTH_CHECK_QUEUE_NAME)
    public String healthCheck() {
        return "some result";
    }

}

我正在尝试向Fanout Exchange发送消息,并接收所有回复,以了解哪些消费者正在运行。

我可以发送一条消息并得到这样的第一个回复:

@Autowired
RabbitTemplate template;

// ...
String firstReply = template.convertSendAndReceiveAsType("health-check-fanout", "", "", ParameterizedTypeReference.forType(String.class));

不过,我需要对这条信息做最全面的回复,而不仅仅是第一条。我需要设置一个应答侦听器,但我不确定如何设置。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/30885
 
1766 次点击  
文章 [ 1 ]  |  最新文章 6 年前
Gary Russell
Reply   •   1 楼
Gary Russell    6 年前

这个 (convertS|s)endAndReceive.*() 方法不是用来处理多个答复的;它们严格来说是一个请求/一个答复方法。

你需要使用 (convertAndS|s)end() 方法发送请求,并实现您自己的答复机制,可能使用侦听器容器进行答复,以及一些组件来聚合答复。

您可以使用类似Spring集成聚合器这样的工具,但是您需要一些机制( ReleaseStrategy )这将知道何时收到所有预期的答复。

或者,您可以简单地接收离散的答复并单独处理它们。

编辑

@SpringBootApplication
public class So54207780Application {

    public static void main(String[] args) {
        SpringApplication.run(So54207780Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> template.convertAndSend("fanout", "", "foo", m -> {
            m.getMessageProperties().setReplyTo("replies");
            return m;
        });
    }

    @RabbitListener(queues = "queue1")
    public String listen1(String in) {
        return in.toUpperCase();
    }

    @RabbitListener(queues = "queue2")
    public String listen2(String in) {
        return in + in;
    }

    @RabbitListener(queues = "replies")
    public void replyHandler(String reply) {
        System.out.println(reply);
    }

    @Bean
    public FanoutExchange fanout() {
        return new FanoutExchange("fanout");
    }

    @Bean
    public Queue queue1() {
        return new Queue("queue1");
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(fanout());
    }

    @Bean
    public Queue queue2() {
        return new Queue("queue2");
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(fanout());
    }

    @Bean
    public Queue replies() {
        return new Queue("replies");
    }

}

FOO
foofoo