ActiveMq集成配置指导
ActiveMQ介绍
ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的、面向消息的、能够跨越多语言和多系统的应用集成消息通信中间件。
它为企业应用中消息传递提供高可用、出色性能、可扩展、稳定和安全保障。
ActiveMQ实现JMS规范并在此之上提供大量额外的特性。ActiveMQ支持队列和订阅两种模式的消息发送。
ActiveMQ的两种消息传递类型:
(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。
(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的。
两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。
Spring Boot集成ActiveMQ
1. 添加activemq maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2. application.yml文件中配置ActiveMQ的连接信息
app:
amq:
aiStoreTopic: ai_store_topic # 定义项目默认主题(非必须)
spring:
activemq:
broker-url: tcp://192.168.1.xx:61616
user: xxxUser
password: xxxxPassword
in-memory: off # 关闭内存模式,采用生产模式activeMq组件
jms:
pub-sub-domain: true ## 默认采用发布订阅模式即Topic模式
template:
default-destination: ${accurad.amq.aiStoreTopic}
time-to-live: 15000
listener:
acknowledge-mode: auto
receive-timeout: 5000
3. 创建消息生成者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private JmsTemplate jmsTemplate;
public void sendMessage(String destination, String message) {
jmsTemplate.convertAndSend(destination, message);
}
}
4.创建消息消费者
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@JmsListener(destination = "${app.amq.aiStoreTopic}")
// @JmsListener(destination = "#{@amqConfig.aiStoreTopicName}") 调用config类中的熟悉,要有getter方法
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
项目案例--默认MessageListener
1.配置类
@Configuration
public class AmqConfig {
@Value("${accurad.amq.aiStoreTopic}")
public String aiStoreTopicName;
@Bean("amqAiStoreTopic")
public Topic amqAiStoreTopic(){
return new ActiveMQTopic(aiStoreTopicName);
}
public String getAiStoreTopicName() {
return aiStoreTopicName;
}
}
2.消息客户端,整合消费和生产逻辑
@Component
@Slf4j
public class MessageClient {
@Resource
private JmsMessagingTemplate messagingTemplate;
// private JmsTemplate jmsTemplate;
@Resource(name = "amqAiStoreTopic")
private Topic amqAiStoreTopic;
/**
* 监听消息事件
*/
// @JmsListener(destination = "#{@amqConfig.aiStoreTopicName}")
@JmsListener(destination = "${accurad.amq.aiStoreTopic}")
public void onMessage(String message){
log.debug("==>>onMessage: {}", message);
}
/**
* 发送消息事件
*/
public void sendMessage(MessageEvent event, String message){
messagingTemplate.convertAndSend(amqAiStoreTopic, getMessageEvent(event, message));
}
public JSONObject getMessageEvent(MessageEvent event, String body){
JSONObject message = new JSONObject();
message.put("type", event.getType());
message.put("body", body);
return message;
}
}
3.消息事件类型枚举
public enum MessageEvent {
/** 应用创建 **/
APP_CREATE("message_event_app_create"),
/** 应用删除 **/
APP_UPDATE("message_event_app_delete"),
/** 应用更新 **/
APP_DELETE("message_event_app_update"),
/** 订单创建 **/
APP_ORDER("message_event_app_order"),
/** 订单支付 **/
APP_ORDER_PAY("message_event_order_pay"),
/** 订单取消 **/
APP_ORDER_CANCEL("message_event_order_cancel"),
/** 引用消费 **/
APP_ORDER_CONSUME("message_event_app_consume");
private final String type;
MessageEvent(String _type) {
this.type = _type;
}
public String getType() {
return type;
}
public static MessageEvent create(String eventCode) {
if (StringUtils.equalsIgnoreCase(eventCode, APP_CREATE.type)) {
return MessageEvent.APP_CREATE;
} else if (StringUtils.equalsIgnoreCase(eventCode, APP_DELETE.type)) {
return MessageEvent.APP_DELETE;
} else if (StringUtils.equalsIgnoreCase(eventCode, APP_UPDATE.type)) {
return MessageEvent.APP_UPDATE;
} else if (StringUtils.equalsIgnoreCase(eventCode, APP_ORDER_ADD.type)) {
return MessageEvent.APP_ORDER_ADD;
} else if (StringUtils.equalsIgnoreCase(eventCode, APP_ORDER_PAY.type)) {
return MessageEvent.APP_ORDER_PAY;
} else if (StringUtils.equalsIgnoreCase(eventCode, APP_ORDER_REMOVE.type)) {
return MessageEvent.APP_ORDER_REMOVE;
}
return null;
}
}
项目案例--自定义MessageListener
SpringBoot默认消息监听工厂只有一个,默认为Queue模式,我们可以通过上面的配置修改成Topic模式。但是如果项目同时还有使用Queue模式消息就需要自定义MessageListener。实现项目同时支持两种模式消息监听。
1.配置Qmq监听工厂
@Configuration
@EnableJms
public class AmqConfig {
@Value("${accurad.amq.aiStoreTopic}")
public String aiStoreTopicName;
@Value("${spring.jms.listener.receive-timeout}")
private Integer messageReceiveTimeout;
@Value("${spring.jms.template.time-to-live}")
private Integer messageTimeToLive;
// 注意:如果是配置文件已经修改成topic模式,可以不用创建Topic对应的factory,直接使用Springboot默认即可,以下Bean可以不创建
@Bean("topicListenerContainerFactory")
public JmsListenerContainerFactory<?> JmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 监听发布订阅模式TOPIC
factory.setPubSubDomain(true);
return factory;
}
// 注意:如果采用JmsTemplate类发送消息,可以不用定义一下Bean
@Bean("jmsMessagingTemplate")
public JmsMessagingTemplate jmsMessagingTemplate(ConnectionFactory connectionFactory) {
JmsMessagingTemplate messagingTemplate = new JmsMessagingTemplate(connectionFactory);
messagingTemplate.setDefaultDestination(new ActiveMQTopic(aiStoreTopicName));
JmsTemplate template = messagingTemplate.getJmsTemplate();
if (template != null) {
template.setReceiveTimeout(messageReceiveTimeout);
template.setTimeToLive(messageTimeToLive);
template.setDeliveryMode(DeliveryMode.PERSISTENT);
}
return messagingTemplate;
}
// 注意:配置文件修改成Topic模式后,自定义一个Queue模式的监听工厂,监听Queue消息时指定当前的Listener
@Bean("queueListenerContainerFactory")
public JmsListenerContainerFactory<?> JmsListenerContainerFactoryQueue(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 监听点对点模式QUEUE
factory.setPubSubDomain(false);
return factory;
}
}
2.消息监听
/**
* 监听Topic消息,containerFactory可以不指定,采用springboot默认的工厂
*/
@JmsListener(destination = "${accurad.amq.aiStoreTopic}", containerFactory = "topicListenerContainerFactory")
public void onMessage(String message){
log.debug("==>>onMessage: {}", message);
}
/**
* 监听Queue消息,containerFactory必须指定我们创建的监听工厂,否则无法收到Queue模式消息。
*/
@JmsListener(destination = "${accurad.amq.aiStoreQueue}", containerFactory = "queueListenerContainerFactory")
public void onQueueMessage1(TextMessage message) throws JMSException {
log.debug("==>>aiStoreQueue1 : {}", message.getText());
//消息应答:对应需要答复的消息tmpelate按如下方式向请求者创建的临时队列中发送消息。
jmsTemplate.send(message.getJMSReplyTo(), new GenericMessage<>("aiStoreQueue1收到,谢谢!"));
}
@JmsListener(destination = "${accurad.amq.aiStoreQueue}", containerFactory = "queueListenerContainerFactory")
public void onQueueMessage2(TextMessage message) throws JMSException {
log.debug("==>>aiStoreQueue2: {}", message.getText());
jmsTemplate.send(message.getJMSReplyTo(), new GenericMessage<>("aiStoreQueue2收到,谢谢!"));
}
3.消息发送并等待答复
/**
* 向队列中发送消息并得到响应
*
* @param destinationName 队列名称
* @param message 消息体
* @return 响应体
*/
public String sendAndReceive(String destinationName, String message) {
try {
// 消息发送后会堵塞,等待返回
org.springframework.messaging.Message<?> response = jmsTemplate.sendAndReceive(destinationName, new GenericMessage<>(message));
if (response != null) {
return String.valueOf(response.getPayload());
} else {
throw new RuntimeException("JmsMessage Timeout! destination: " + destinationName);
}
} catch (MessagingException ex) {
log.error("sendAndReceive 失败!目标队列:{}, 消息体:{}。", destinationName, message, ex);
throw new RuntimeException("JmsMessage exception! destination: " + destinationName);
}
}
4.完整发送接收示例
package com.xxxx.amq;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
* amq 客户端
*
* @author: xxxx
* @date: 2024/8/8 11:40:54
* @version: v1.0
*/
@Component
@Slf4j
public class MessageClient {
@Resource(name = "jmsMessagingTemplate")
private JmsMessagingTemplate jmsTemplate;
/**
* 监听消息事件
*/
@JmsListener(destination = "${accurad.amq.aiStoreTopic}", containerFactory = "topicListenerContainerFactory")
public void onMessage(String message){
log.debug("==>>onMessage: {}", message);
}
@JmsListener(destination = "${accurad.amq.aiStoreQueue}", containerFactory = "queueListenerContainerFactory")
public void onQueueMessage1(TextMessage message) throws JMSException {
log.debug("==>>aiStoreQueue1 : {}", message.getText());
jmsTemplate.send(message.getJMSReplyTo(), new GenericMessage<>("aiStoreQueue1收到,谢谢!"));
}
@JmsListener(destination = "${accurad.amq.aiStoreQueue}", containerFactory = "queueListenerContainerFactory")
public void onQueueMessage2(TextMessage message) throws JMSException {
log.debug("==>>aiStoreQueue2: {}", message.getText());
jmsTemplate.send(message.getJMSReplyTo(), new GenericMessage<>("aiStoreQueue2收到,谢谢!"));
}
/**
* 广播AiStore 事件消息
*/
public void broadcastAiStoreMessage(MessageEvent event, String message) {
// ActiveMQTopic topic = new ActiveMQTopic(aiStoreTopicName);
jmsTemplate.convertAndSend(getMessageEvent(event, message));
}
/**
* 广播AiStore 事件消息
*/
public void broadcastAiStoreMessage(String event, String message) {
MessageEvent msgEvent = MessageEvent.create(event);
broadcastAiStoreMessage(msgEvent, message);
}
/**
* 使用指定的主题发送消息事件
*/
public void sendMessage(String destinationName, String message) {
if (destinationName.indexOf("topic") > 0) {
sendTopicMessage(destinationName, message);
} else {
sendQueueMessage(destinationName, message);
}
}
/**
* 向指定的队列中发送消息
*
* @param destinationName 队列名称
* @param message 消息体
*/
public void sendQueueMessage(String destinationName, String message) {
jmsTemplate.convertAndSend(new ActiveMQQueue(destinationName), message);
}
/**
* 向指定的主题中发送消息
*
* @param destinationName 主题名称
* @param message 消息体
*/
public void sendTopicMessage(String destinationName, String message) {
jmsTemplate.convertAndSend(new ActiveMQTopic(destinationName), message);
}
/**
* 向队列中发送消息并得到响应
*
* @param destinationName 队列名称
* @param message 消息体
* @return 响应体
*/
public String sendAndReceive(String destinationName, String message) {
try {
org.springframework.messaging.Message<?> response = jmsTemplate.sendAndReceive(destinationName, new GenericMessage<>(message));
if (response != null) {
return String.valueOf(response.getPayload());
} else {
throw new RuntimeException("JmsMessage Timeout! destination: " + destinationName);
}
} catch (MessagingException ex) {
log.error("sendAndReceive 失败!目标队列:{}, 消息体:{}。", destinationName, message, ex);
throw new RuntimeException("JmsMessage exception! destination: " + destinationName);
}
}
public JSONObject sendAndReceiveJson(String destinationName, String message) {
String response = sendAndReceive(destinationName, message);
if (JSON.isValid(response)) {
return JSONObject.parseObject(response);
} else {
return JSONObject.of("key", response);
}
}
public String getMessageEvent(MessageEvent event, String body) {
JSONObject message = new JSONObject();
message.put("type", event.getType());
message.put("body", JSON.parse(body));
return message.toJSONString();
}
}