Spring Boot 为 AcitveMQ 提供了自动配置,可以直接使用jmsTemplate,自动开启了消息监听注解@EnableJms。
更多关于消息服务概念和支持的组件可阅读Spring Boot 2实践系列(三十三):JMS 和 AMQP 消息服务及支持的消息组件。
ActiveMQ 简介
ActiveMQ 速度快,支持Java,C,C ++,C#,Ruby,Perl,Python,PHP等多种跨语言客户端和协议。 具有易于使用的企业集成模式和许多高级特性,同时完全支持JMS 1.1 和 J2EE 1.4,支持 AMQP v1.0协议,支持 MQTT v3.1允许在物联网环境中连接。
ActiveMQ 可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置,Spring Boot 为 ActiveMQ 提供了自动配置。
Apache ActiveMQ 官网, Download, Getting Started,Using Apache ActiveMQ。
AcitveMQ 安装
- 从官网下载软件包,这里以 Linux 系统为例。
 在 Windows 下载 ActiveMQ 软件包上传到 Linux,或 Linux 服务器直接下载(前提是可连外网) 。
- 解压软件包 - tar zxvf activemq-x.x.x-bin.tar.gz - 以 AcitveMQ v5.15.6 为例 
 tar zxvf apache-activemq-5.15.6-bin.tar.gz
- 运行 AcitveMQ 
 进入 AcitveMQ 解压目录,启动 AcitveMQ- cd :/usr/local/apache-activemq-5.15.6/bin/ 
 ./activemq start- 若出现无法启动,查看日志: - cd apache-activemq-5.15.6/data/ 
 cat activemq.log- 查看最近的异常信息,若出现如下异常: - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.net.URISyntaxException: Illegal character in hostname at index 9: ws://dev_linux:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.xbean.XBeanBrokerFactory$1 | main 
 这里面的 dev_linux 服务器名,在 activemq.xml 配置文件里默认配置的是 0.0.0.0 的IP地址,估计是ws协议不支持这种转换。- 修改配置文件:activemq.xml - cd apache-activemq-5.15.6/conf/ 
 vim activemq.xml
 搜索: /ws://0.0.0.0
 修改 ws://0.0.0.0 为 ws://127.0.0.1/ 保存- 再启动 
- AcitveMQ 启停命令 - # AcitveMQ 启停命令 ./activemq tstart | stop | restart # 查看 AcitveMQ 相关信息和命令 ./activemq 不带任何参数
- AcitveMQ 自带了 Web 管理端,通过浏览器访问,Web 访问端口默认是 8162。 - Web 页面是基于 JSP 实现,Servlet 容器是 Jetty ,Jetty 配置文件在 /conf/jetty.xml 中。 - 浏览器输入:http://ip:8162 , 默认账号密码是 admin/admin。 - 若要修改端口号,打开 /conf/jetty.xml 文件,找到 bean 的 id="jettyPort " 项,修改 port 属性 - <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8161"/> </bean>
- Web 页面登录密码 - AcitveMQ 自带的 Web 管理页面的默认开启了登录认证,默认账号密码是:admin / admin - 若要修改登记录认配置,打开 /conf/jetty.xml 文件,打到 bean id = "securityConstraint" 项 - <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="user,admin" /> <!-- set authenticate=false to disable login --> <property name="authenticate" value="true" /> </bean> <bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="admin" /> <!-- set authenticate=false to disable login --> <property name="authenticate" value="true" /> </bean>- 上面默认配置,有两个角色,分别是:user 和 admin,authenticate 属性值为 true 表示开启登录认证,若要关闭登录认证则修改值为 false。可以只对 admin 角色定制化是否开关登录认证。 - 修改登录密码,打开 /conf/jetty-realm.properties 文件,默认内容如下,修改密码字段的内容。 - # username: password [,rolename ...] # 用户名: 密码 [, 角色名] admin: 123456, admin user: user, user
- 配置 TCP 连接 ActiveMQ 密码,供 Spring Boot 集成访问 - 默认的 TCP 连接端口是 61616。端口号配置在 /conf/activemq.xml 文件中,连接器名为 openwire,uri 以 tcp 开头的配置项,如下,可修改端口号。 - <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>- 修改 conf 目录中 credentials.properties 文件进行密码设置,此配置被 activemq.xml 引用。 - activemq.username=root activemq.password=123456 guest.password=123456
- 也可以直接使用 AcitveMQ 的 Docker 镜像,做好端口映射;省略手动安装 AcitveMQ 服务。 
AcitveMQ 集成
添加依赖
pom.xml 文件添加依赖。如果要配置连接池,必须添加activemq-pool依赖,否则报错:找不到 JmsTemplate Bean
<!--ActiveMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq-pool-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>
连接配置
application.properties 配置文件添加配置 ActiveMQ 连接配置。
Spring Boot 自动配置默认开启的消息模式是Queue队列点对点模式;如果要使用Topic发布/订阅模式(Pub/Sub),则将 spring.jms.pub-sub-domain=改为true。
#---------Spring JMS-----------------------
##----默认为false,queue(点对点)模式; 修为true,则是topic(发布/订阅模式)
#spring.jms.pub-sub-domain=false
#---------ActiveMQ-------------------------
spring.activemq.broker-url=tcp://10.0.3.4:61616
spring.activemq.user=root
spring.activemq.password=123456
spring.activemq.in-memory=false
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=10
spring.activemq.pool.idle-timeout=30
Topic 配置
ActiveMQ 配置类
自动配置的方式不支持Queue和Topic同时使用,若在一个项目里要同时使用这两种模式,则需要自定义一个 JmsListenerContainerFactory Bean,设置 pub-sub-domain为true,Topic监听注解添加containerFactory属性,指向自定义开启Topic的 JmsListenerContainerFactory。
/**
 * @name: ActiveMQConfig
 * @desc: 配置类
 **/
@Configuration
public class ActiveMQConfig {
    @Bean
    public JmsListenerContainerFactory<?> topicListenerContainer(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory topicListenerContainer = new DefaultJmsListenerContainerFactory();
        topicListenerContainer.setPubSubDomain(true);
        topicListenerContainer.setConnectionFactory(activeMQConnectionFactory);
        return topicListenerContainer;
    }
}
消息生产者
消息生产者:producer
/**
 * @name: MQSendServiceImpl
 * @desc: TODO
 **/
@Service
public class MQProducerServiceImpl implements MQProducerService {
    @Autowired
    private JmsTemplate jmsTemplate;
    /**
     * 发送queue消息
     *
     * @param msg
     * @throws JMSException
     */
    @Override
    public void activeMQSend(String msg) throws JMSException {
        MessageCreator messageCreator = session -> session.createTextMessage(msg);
        //发布queue
        Destination queueDestination = new ActiveMQQueue("my-queue");
        jmsTemplate.send(queueDestination, messageCreator);
        jmsTemplate.convertAndSend(queueDestination, "Hello Queue");
        //发布topic
        Destination topicDestination = new ActiveMQTopic("my-topic");
        jmsTemplate.send(topicDestination, messageCreator);
        jmsTemplate.convertAndSend(queueDestination, "Hello Topic");
    }
}
消息消费者
消费者监听消息:consumer
/**
 * @name: MQConsumerServiceImpl
 * @desc: TODO
 **/
@Service
public class MQConsumerServiceImpl implements MQConsumerService {
    /**
     * 监听queue消息
     * 
     * @param message
     */
    @Override
    @JmsListener(destination = "my-queue")
    public void activeMQQueueReceive(String message) {
        System.out.println("监听收到my-queue消息:" + message);
    }
    /**
     * 监听topic消息
     * 一个项目同时使用 Queue 和 Topic, Topic 监听注解添加`containerFactory`属性,
     * 指向自定义开启**Topic**的 **JmsListenerContainerFactory**。
     * @param message
     */
    @Override
    @JmsListener(destination = "my-topic", containerFactory = "topicListenerContainer")
    public void activeMQTopicReceive(String message) {
        System.out.println("监听收到my-topic消息:" + message);
    }
}
参考资料
注意:本文归作者所有,未经作者允许,不得转载
 
 
            