外观
SpringBoot RabbitMq集成配置示例
简介
rabbitmq主要有三大类交换机:fanout,direct,topic,headers,他们从名字上分别是扇区交换机、直连交换机和主题交换机。
fanout交换机也叫无路由交换机,就是它直接与交换机exchange发生关联,不用routingKey。
direct和topic都加入了routingKey的概念,发送数据的时候,它只跟交换机和路由发生关系,不关心最终的队列queue。
就是生产者这里只需要将消息绑定到exchange和routingKey上,如果是springboot与rabbitmq关联,
这里也引出了本文的主要内容:direct类型和topic类型到底有什么区别呢?区别就是:direct类型从字面上看就是直连的意思,从生产者到消费者这条线上,routingKey只能一一对应,而topic主题类型生产者到消费者routingKey可以多对一。可以用如下直观的图形表示:
SpringBoot集成
1. 添加AMQ maven依赖
若父工程添加了spring-boot-starter-parent依赖,直接添加如下坐标,无需指定版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置AMQ 工具类
SpringBoot添加amq-starter依赖后默认配置RabbitTemplate 到容器,若没有异步发送消息的需求当前步骤可以跳过。如下注册了异步消息工具类AsyncRabbitTemplate到容器。
@Configuration
public class AmqConfig {
public static final String MQ_QUEUE_CHECK_APPLY = "patientCheckApply";
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate() {
AsyncRabbitTemplate async = new AsyncRabbitTemplate(rabbitTemplate);
async.setReceiveTimeout(10 * 1000);
return async;
}
}
3.添加AMQ服务器配置
提前在RabbitMQ管理端创建好用户、virtual-host、相关队列。
spring:
rabbitmq:
username: yizhen
password: yizhen@321
addresses: 192.168.10.37
port: 5672
virtual-host: yizhen-cloud
4. 生成者发送消息
在Spring的组件Controller、service 等bean中注入工具类RabbitTemplate 或 AsyncRabbitTemplate,以下代码做简单演示。
@RestController
@RequestMapping("/webService")
@Slf4j
public class MqDemoController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AsyncRabbitTemplate asyncRabbitTemplate;
@Autowired
private IHospitalInfoService hospitalInfoService;
// 按已配置的交换机路由规则匹配队列(管理界面配置)
@GetMapping("/sendMsg")
public Response sendMessage(String exchange, String routeKey,String content){
Message msg = rabbitTemplate.sendAndReceive(exchange, routeKey, new Message(content.getBytes(StandardCharsets.UTF_8)));
return Response.success(new String(msg.getBody()));
}
// 按已配置的交换机路由规则匹配队列(管理界面配置)
@GetMapping("/sendObj")
public Response sendMessage(String exchange, String routeKey){
HospitalInfoEntity info = hospitalInfoService.getById(100101);
Object msg = rabbitTemplate.convertSendAndReceive(exchange, routeKey, info);
return Response.success(msg);
}
// 按已配置的交换机路由规则匹配队列(管理界面配置)
@GetMapping("/asyncSendObj")
public Response asyncSendMessage(String exchange, String routeKey){
HospitalInfoEntity info = hospitalInfoService.getById(100101);
// 发送异步消息,不等待返回,在回调方法实现响应逻辑
AsyncRabbitTemplate.RabbitConverterFuture<String> msg = asyncRabbitTemplate.convertSendAndReceive(exchange, routeKey, info);
msg.addCallback(new SuccessCallback<String>() {
@Override
public void onSuccess(String result) {
log.info("asyncSendObj 发送成功!收到回复内容:{}", result);
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
log.info("asyncSendObj 发送失败!", ex);
}
});
return Response.success(msg);
}
@RabbitListener(queues = "messageQueue")
public String receive_message(Message message){
log.info("==>messageQueue收到消息:{}",new String(message.getBody()));
return "messageQueue over!";
}
@RabbitListener(queues = "messageQueue2")
public String receive_message2(Message message){
log.info("==>messageQueue2收到消息:{}",new String(message.getBody()));
return "messageQueue2 over!";
}
// 自定义交换机路由规则,程序启动后会添加到qmq
@RabbitListener(bindings = { @QueueBinding(
value = @Queue(value = "messageQueue3"),
exchange= @Exchange(value = "amq.direct"), key = "test3"),
})
public String receive_defined(Message message){
log.info("==>defined收到消息:{}",new String(message.getBody()));
return "messageQueue3 over!";
}
// 自定义交换机路由规则,程序启动后会添加到qmq
@RabbitListener(bindings = { @QueueBinding(
value = @Queue(value = "messageQueue3"),
exchange= @Exchange(value = "amq.direct"), key = "test4"),
})
public String receive_defined_4(Message message){
log.info("==>defined-test4收到消息:{}",new String(message.getBody()));
return "messageQueue3-test4 over!";
}
@RabbitListener(bindings = { @QueueBinding(
value = @Queue(value = "messageDirectHospital"),
exchange= @Exchange(value = "amq.direct"), key = "hospital"),
})
public String receive_hospital(@Payload HospitalInfoEntity entity){
log.info("==>hospital收到消息:{}",entity);
return "messageDirectHospital-hospital over!";
}
}