Skip to content

SpringBoot RabbitMq集成配置示例

902字约3分钟

amqjava

2023-12-06

简介

rabbitmq主要有三大类交换机:fanout,direct,topic,headers,他们从名字上分别是扇区交换机、直连交换机和主题交换机。

fanout交换机也叫无路由交换机,就是它直接与交换机exchange发生关联,不用routingKey。

direct和topic都加入了routingKey的概念,发送数据的时候,它只跟交换机和路由发生关系,不关心最终的队列queue。

就是生产者这里只需要将消息绑定到exchange和routingKey上,如果是springboot与rabbitmq关联,

这里也引出了本文的主要内容:direct类型和topic类型到底有什么区别呢?区别就是:direct类型从字面上看就是直连的意思,从生产者到消费者这条线上,routingKey只能一一对应,而topic主题类型生产者到消费者routingKey可以多对一。可以用如下直观的图形表示: img

SpringBoot集成

1. 添加AMQ maven依赖

若父工程添加了spring-boot-starter-parent依赖,直接添加如下坐标,无需指定版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置AMQ 工具类

SpringBoot添加amq-starter依赖后默认配置RabbitTemplate 到容器,若没有异步发送消息的需求当前步骤可以跳过。如下注册了异步消息工具类AsyncRabbitTemplate到容器。

@Configuration
public class AmqConfig {
	public static final String MQ_QUEUE_CHECK_APPLY = "patientCheckApply";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate() {
        AsyncRabbitTemplate async = new AsyncRabbitTemplate(rabbitTemplate);
        async.setReceiveTimeout(10 * 1000);
        return async;
    }
}

3.添加AMQ服务器配置

提前在RabbitMQ管理端创建好用户、virtual-host、相关队列。

spring:
  rabbitmq:
    username: yizhen
    password: yizhen@321
    addresses: 192.168.10.37
    port: 5672
    virtual-host: yizhen-cloud

4. 生成者发送消息

在Spring的组件Controller、service 等bean中注入工具类RabbitTemplate 或 AsyncRabbitTemplate,以下代码做简单演示。

@RestController
@RequestMapping("/webService")
@Slf4j
public class MqDemoController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private AsyncRabbitTemplate asyncRabbitTemplate;

    @Autowired
    private IHospitalInfoService hospitalInfoService;
    
    // 按已配置的交换机路由规则匹配队列(管理界面配置)
    @GetMapping("/sendMsg")
    public Response sendMessage(String exchange, String routeKey,String content){
        Message msg = rabbitTemplate.sendAndReceive(exchange, routeKey, new Message(content.getBytes(StandardCharsets.UTF_8)));
        return Response.success(new String(msg.getBody()));
    }

    // 按已配置的交换机路由规则匹配队列(管理界面配置)
    @GetMapping("/sendObj")
    public Response sendMessage(String exchange, String routeKey){
        HospitalInfoEntity info = hospitalInfoService.getById(100101);
        Object msg = rabbitTemplate.convertSendAndReceive(exchange, routeKey, info);
        return Response.success(msg);
    }

    // 按已配置的交换机路由规则匹配队列(管理界面配置)
    @GetMapping("/asyncSendObj")
    public Response asyncSendMessage(String exchange, String routeKey){
        HospitalInfoEntity info = hospitalInfoService.getById(100101);
        // 发送异步消息,不等待返回,在回调方法实现响应逻辑
        AsyncRabbitTemplate.RabbitConverterFuture<String> msg = asyncRabbitTemplate.convertSendAndReceive(exchange, routeKey, info);
        msg.addCallback(new SuccessCallback<String>() {
            @Override
            public void onSuccess(String result) {
                log.info("asyncSendObj 发送成功!收到回复内容:{}", result);
            }
        }, new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("asyncSendObj 发送失败!", ex);
            }
        });
        return Response.success(msg);
    }
    
    @RabbitListener(queues = "messageQueue")
    public String receive_message(Message message){
        log.info("==>messageQueue收到消息:{}",new String(message.getBody()));
        return "messageQueue over!";
    }

    @RabbitListener(queues = "messageQueue2")
    public String receive_message2(Message message){
        log.info("==>messageQueue2收到消息:{}",new String(message.getBody()));
        return "messageQueue2 over!";
    }

    // 自定义交换机路由规则,程序启动后会添加到qmq
    @RabbitListener(bindings = { @QueueBinding(
            value = @Queue(value = "messageQueue3"), 
            exchange= @Exchange(value = "amq.direct"), key = "test3"),
    })
    public String receive_defined(Message message){
        log.info("==>defined收到消息:{}",new String(message.getBody()));
        return "messageQueue3 over!";
    }

    // 自定义交换机路由规则,程序启动后会添加到qmq
    @RabbitListener(bindings = { @QueueBinding(
            value = @Queue(value = "messageQueue3"),
            exchange= @Exchange(value = "amq.direct"), key = "test4"),
    })
    public String receive_defined_4(Message message){
        log.info("==>defined-test4收到消息:{}",new String(message.getBody()));
        return "messageQueue3-test4 over!";
    }

    @RabbitListener(bindings = { @QueueBinding(
            value = @Queue(value = "messageDirectHospital"),
            exchange= @Exchange(value = "amq.direct"), key = "hospital"),
    })
    public String receive_hospital(@Payload HospitalInfoEntity entity){
        log.info("==>hospital收到消息:{}",entity);
        return "messageDirectHospital-hospital over!";
    }
}

参考资料

rabbitmq中交换机类型direct和topic的区别_rabbitmq topic direct

陕ICP备2021014644号-1