Skip to content

SpringBoot ActiveMq 集成配置示例

1790字约6分钟

amqjava

2024-09-19

ActiveMQ介绍

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的、面向消息的、能够跨越多语言和多系统的应用集成消息通信中间件。

它为企业应用中消息传递提供高可用、出色性能、可扩展、稳定和安全保障。

ActiveMQ实现JMS规范并在此之上提供大量额外的特性。ActiveMQ支持队列和订阅两种模式的消息发送。

ActiveMQ的两种消息传递类型:

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。

(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的。

两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。

Spring Boot集成ActiveMQ

1. 添加activemq maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2. application.yml文件中配置ActiveMQ的连接信息

app:
  amq:
    aiStoreTopic: ai_store_topic  # 定义项目默认主题(非必须)
    
  spring:
      activemq:
      broker-url: tcp://192.168.1.xx:61616
      user: xxxUser
      password: xxxxPassword
      in-memory: off  # 关闭内存模式,采用生产模式activeMq组件

    
  jms:
    pub-sub-domain: true   ## 默认采用发布订阅模式即Topic模式
    template:
      default-destination: ${accurad.amq.aiStoreTopic}
      time-to-live: 15000
    listener:
      acknowledge-mode: auto
      receive-timeout: 5000

3. 创建消息生成者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage(String destination, String message) {
        jmsTemplate.convertAndSend(destination, message);
    }
}

4.创建消息消费者

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = "${app.amq.aiStoreTopic}")
    // @JmsListener(destination = "#{@amqConfig.aiStoreTopicName}") 调用config类中的熟悉,要有getter方法
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

项目案例--默认MessageListener

1.配置类

@Configuration
public class AmqConfig {

    @Value("${accurad.amq.aiStoreTopic}")
    public String aiStoreTopicName;

    @Bean("amqAiStoreTopic")
    public Topic amqAiStoreTopic(){
        return new ActiveMQTopic(aiStoreTopicName);
    }

    public String getAiStoreTopicName() {
        return aiStoreTopicName;
    }

}

2.消息客户端,整合消费和生产逻辑

@Component
@Slf4j
public class MessageClient {

    @Resource
    private JmsMessagingTemplate messagingTemplate;
    // private JmsTemplate jmsTemplate;

    @Resource(name = "amqAiStoreTopic")
    private Topic amqAiStoreTopic;


    /**
     * 监听消息事件
     */
    // @JmsListener(destination = "#{@amqConfig.aiStoreTopicName}")
    @JmsListener(destination = "${accurad.amq.aiStoreTopic}")
    public void onMessage(String message){
        log.debug("==>>onMessage: {}", message);
    }

    /**
     * 发送消息事件
     */
    public void sendMessage(MessageEvent event, String message){
        messagingTemplate.convertAndSend(amqAiStoreTopic, getMessageEvent(event, message));
    }

    public JSONObject getMessageEvent(MessageEvent event, String body){
        JSONObject message = new JSONObject();
        message.put("type", event.getType());
        message.put("body", body);
        return message;
    }
}

3.消息事件类型枚举

public enum MessageEvent {

    /** 应用创建 **/
    APP_CREATE("message_event_app_create"),

    /** 应用删除 **/
    APP_UPDATE("message_event_app_delete"),

    /** 应用更新 **/
    APP_DELETE("message_event_app_update"),

    /** 订单创建 **/
    APP_ORDER("message_event_app_order"),

    /** 订单支付 **/
    APP_ORDER_PAY("message_event_order_pay"),

    /** 订单取消 **/
    APP_ORDER_CANCEL("message_event_order_cancel"),

    /** 引用消费 **/
    APP_ORDER_CONSUME("message_event_app_consume");

    private final String type;

    MessageEvent(String _type) {
        this.type = _type;
    }

    public String getType() {
        return type;
    }
    
    public static MessageEvent create(String eventCode) {
        if (StringUtils.equalsIgnoreCase(eventCode, APP_CREATE.type)) {
            return MessageEvent.APP_CREATE;
        } else if (StringUtils.equalsIgnoreCase(eventCode, APP_DELETE.type)) {
            return MessageEvent.APP_DELETE;
        } else if (StringUtils.equalsIgnoreCase(eventCode, APP_UPDATE.type)) {
            return MessageEvent.APP_UPDATE;
        } else if (StringUtils.equalsIgnoreCase(eventCode, APP_ORDER_ADD.type)) {
            return MessageEvent.APP_ORDER_ADD;
        } else if (StringUtils.equalsIgnoreCase(eventCode, APP_ORDER_PAY.type)) {
            return MessageEvent.APP_ORDER_PAY;
        } else if (StringUtils.equalsIgnoreCase(eventCode, APP_ORDER_REMOVE.type)) {
            return MessageEvent.APP_ORDER_REMOVE;
        }
        return null;
    }
}

项目案例--自定义MessageListener

SpringBoot默认消息监听工厂只有一个,默认为Queue模式,我们可以通过上面的配置修改成Topic模式。但是如果项目同时还有使用Queue模式消息就需要自定义MessageListener。实现项目同时支持两种模式消息监听。

1.配置Qmq监听工厂

@Configuration
@EnableJms
public class AmqConfig {

    @Value("${accurad.amq.aiStoreTopic}")
    public String aiStoreTopicName;

    @Value("${spring.jms.listener.receive-timeout}")
    private Integer messageReceiveTimeout;

    @Value("${spring.jms.template.time-to-live}")
    private Integer messageTimeToLive;

    // 注意:如果是配置文件已经修改成topic模式,可以不用创建Topic对应的factory,直接使用Springboot默认即可,以下Bean可以不创建
    @Bean("topicListenerContainerFactory")
    public JmsListenerContainerFactory<?> JmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 监听发布订阅模式TOPIC
        factory.setPubSubDomain(true);
        return factory;
    }

    // 注意:如果采用JmsTemplate类发送消息,可以不用定义一下Bean
    @Bean("jmsMessagingTemplate")
    public JmsMessagingTemplate jmsMessagingTemplate(ConnectionFactory connectionFactory) {
        JmsMessagingTemplate messagingTemplate = new JmsMessagingTemplate(connectionFactory);
        messagingTemplate.setDefaultDestination(new ActiveMQTopic(aiStoreTopicName));
        JmsTemplate template = messagingTemplate.getJmsTemplate();
        if (template != null) {
            template.setReceiveTimeout(messageReceiveTimeout);
            template.setTimeToLive(messageTimeToLive);
            template.setDeliveryMode(DeliveryMode.PERSISTENT);
        }
        return messagingTemplate;
    }

    // 注意:配置文件修改成Topic模式后,自定义一个Queue模式的监听工厂,监听Queue消息时指定当前的Listener
    @Bean("queueListenerContainerFactory")
    public JmsListenerContainerFactory<?> JmsListenerContainerFactoryQueue(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 监听点对点模式QUEUE
        factory.setPubSubDomain(false);
        return factory;
    }

}

2.消息监听

/**
 * 监听Topic消息,containerFactory可以不指定,采用springboot默认的工厂
 */
@JmsListener(destination = "${accurad.amq.aiStoreTopic}", containerFactory = "topicListenerContainerFactory")
public void onMessage(String message){
    log.debug("==>>onMessage: {}", message);
}

/**
* 监听Queue消息,containerFactory必须指定我们创建的监听工厂,否则无法收到Queue模式消息。
*/
@JmsListener(destination = "${accurad.amq.aiStoreQueue}", containerFactory = "queueListenerContainerFactory")
public void onQueueMessage1(TextMessage message) throws JMSException {
    log.debug("==>>aiStoreQueue1 : {}", message.getText());
    //消息应答:对应需要答复的消息tmpelate按如下方式向请求者创建的临时队列中发送消息。
    jmsTemplate.send(message.getJMSReplyTo(), new GenericMessage<>("aiStoreQueue1收到,谢谢!"));
}

@JmsListener(destination = "${accurad.amq.aiStoreQueue}", containerFactory = "queueListenerContainerFactory")
public void onQueueMessage2(TextMessage message) throws JMSException {
    log.debug("==>>aiStoreQueue2: {}", message.getText());
    jmsTemplate.send(message.getJMSReplyTo(), new GenericMessage<>("aiStoreQueue2收到,谢谢!"));
}

3.消息发送并等待答复

/**
 * 向队列中发送消息并得到响应
 *
 * @param destinationName 队列名称
 * @param message         消息体
 * @return 响应体
 */
public String sendAndReceive(String destinationName, String message) {
    try {
    	// 消息发送后会堵塞,等待返回
        org.springframework.messaging.Message<?> response = jmsTemplate.sendAndReceive(destinationName, new GenericMessage<>(message));
        if (response != null) {
            return String.valueOf(response.getPayload());
        } else {
            throw new RuntimeException("JmsMessage Timeout! destination: " + destinationName);
        }
    } catch (MessagingException ex) {
        log.error("sendAndReceive 失败!目标队列:{}, 消息体:{}。", destinationName, message, ex);
        throw new RuntimeException("JmsMessage exception! destination: " + destinationName);
    }
}

4.完整发送接收示例

package com.xxxx.amq;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.TextMessage;

/**
 * amq 客户端
 *
 * @author: xxxx
 * @date: 2024/8/8 11:40:54
 * @version: v1.0
 */
@Component
@Slf4j
public class MessageClient {

    @Resource(name = "jmsMessagingTemplate")
    private JmsMessagingTemplate jmsTemplate;

    /**
     * 监听消息事件
     */
    @JmsListener(destination = "${accurad.amq.aiStoreTopic}", containerFactory = "topicListenerContainerFactory")
    public void onMessage(String message){
        log.debug("==>>onMessage: {}", message);
    }

    @JmsListener(destination = "${accurad.amq.aiStoreQueue}", containerFactory = "queueListenerContainerFactory")
    public void onQueueMessage1(TextMessage message) throws JMSException {
        log.debug("==>>aiStoreQueue1 : {}", message.getText());
        jmsTemplate.send(message.getJMSReplyTo(), new GenericMessage<>("aiStoreQueue1收到,谢谢!"));
    }

    @JmsListener(destination = "${accurad.amq.aiStoreQueue}", containerFactory = "queueListenerContainerFactory")
    public void onQueueMessage2(TextMessage message) throws JMSException {
        log.debug("==>>aiStoreQueue2: {}", message.getText());
        jmsTemplate.send(message.getJMSReplyTo(), new GenericMessage<>("aiStoreQueue2收到,谢谢!"));
    }

    /**
     * 广播AiStore 事件消息
     */
    public void broadcastAiStoreMessage(MessageEvent event, String message) {
        // ActiveMQTopic topic = new ActiveMQTopic(aiStoreTopicName);
        jmsTemplate.convertAndSend(getMessageEvent(event, message));
    }

    /**
     * 广播AiStore 事件消息
     */
    public void broadcastAiStoreMessage(String event, String message) {
        MessageEvent msgEvent = MessageEvent.create(event);
        broadcastAiStoreMessage(msgEvent, message);
    }


    /**
     * 使用指定的主题发送消息事件
     */
    public void sendMessage(String destinationName, String message) {
        if (destinationName.indexOf("topic") > 0) {
            sendTopicMessage(destinationName, message);
        } else {
            sendQueueMessage(destinationName, message);
        }
    }

    /**
     * 向指定的队列中发送消息
     *
     * @param destinationName 队列名称
     * @param message         消息体
     */
    public void sendQueueMessage(String destinationName, String message) {
        jmsTemplate.convertAndSend(new ActiveMQQueue(destinationName), message);
    }

    /**
     * 向指定的主题中发送消息
     *
     * @param destinationName 主题名称
     * @param message         消息体
     */
    public void sendTopicMessage(String destinationName, String message) {
        jmsTemplate.convertAndSend(new ActiveMQTopic(destinationName), message);
    }

    /**
     * 向队列中发送消息并得到响应
     *
     * @param destinationName 队列名称
     * @param message         消息体
     * @return 响应体
     */
    public String sendAndReceive(String destinationName, String message) {
        try {
            org.springframework.messaging.Message<?> response = jmsTemplate.sendAndReceive(destinationName, new GenericMessage<>(message));
            if (response != null) {
                return String.valueOf(response.getPayload());
            } else {
                throw new RuntimeException("JmsMessage Timeout! destination: " + destinationName);
            }
        } catch (MessagingException ex) {
            log.error("sendAndReceive 失败!目标队列:{}, 消息体:{}。", destinationName, message, ex);
            throw new RuntimeException("JmsMessage exception! destination: " + destinationName);
        }
    }

    public JSONObject sendAndReceiveJson(String destinationName, String message) {
        String response = sendAndReceive(destinationName, message);
        if (JSON.isValid(response)) {
            return JSONObject.parseObject(response);
        } else {
            return JSONObject.of("key", response);
        }
    }

    public String getMessageEvent(MessageEvent event, String body) {
        JSONObject message = new JSONObject();
        message.put("type", event.getType());
        message.put("body", JSON.parse(body));
        return message.toJSONString();
    }
}

陕ICP备2021014644号-1