Spring Boot 为 AcitveMQ 提供了自动配置,可以直接使用jmsTemplate
,自动开启了消息监听注解@EnableJms
。
更多关于消息服务概念和支持的组件可阅读Spring Boot 2实践系列(三十三):JMS 和 AMQP 消息服务及支持的消息组件。
ActiveMQ 简介
ActiveMQ 速度快,支持Java,C,C ++,C#,Ruby,Perl,Python,PHP等多种跨语言客户端和协议。 具有易于使用的企业集成模式和许多高级特性,同时完全支持JMS 1.1 和 J2EE 1.4,支持 AMQP v1.0协议,支持 MQTT v3.1允许在物联网环境中连接。
ActiveMQ 可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置,Spring Boot 为 ActiveMQ 提供了自动配置。
Apache ActiveMQ 官网, Download, Getting Started,Using Apache ActiveMQ。
AcitveMQ 安装
- 从官网下载软件包,这里以 Linux 系统为例。
在 Windows 下载 ActiveMQ 软件包上传到 Linux,或 Linux 服务器直接下载(前提是可连外网) 。 解压软件包
tar zxvf activemq-x.x.x-bin.tar.gz
以 AcitveMQ v5.15.6 为例
tar zxvf apache-activemq-5.15.6-bin.tar.gz运行 AcitveMQ
进入 AcitveMQ 解压目录,启动 AcitveMQcd :/usr/local/apache-activemq-5.15.6/bin/
./activemq start若出现无法启动,查看日志:
cd apache-activemq-5.15.6/data/
cat activemq.log查看最近的异常信息,若出现如下异常:
Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.net.URISyntaxException: Illegal character in hostname at index 9: ws://dev_linux:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.xbean.XBeanBrokerFactory$1 | main
这里面的 dev_linux 服务器名,在 activemq.xml 配置文件里默认配置的是 0.0.0.0 的IP地址,估计是ws协议不支持这种转换。修改配置文件:activemq.xml
cd apache-activemq-5.15.6/conf/
vim activemq.xml
搜索: /ws://0.0.0.0
修改 ws://0.0.0.0 为 ws://127.0.0.1/ 保存再启动
AcitveMQ 启停命令
# AcitveMQ 启停命令 ./activemq tstart | stop | restart # 查看 AcitveMQ 相关信息和命令 ./activemq 不带任何参数
AcitveMQ 自带了 Web 管理端,通过浏览器访问,Web 访问端口默认是 8162。
Web 页面是基于 JSP 实现,Servlet 容器是 Jetty ,Jetty 配置文件在 /conf/jetty.xml 中。
浏览器输入:http://ip:8162 , 默认账号密码是 admin/admin。
若要修改端口号,打开 /conf/jetty.xml 文件,找到 bean 的 id="jettyPort " 项,修改 port 属性
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8161"/> </bean>
Web 页面登录密码
AcitveMQ 自带的 Web 管理页面的默认开启了登录认证,默认账号密码是:admin / admin
若要修改登记录认配置,打开 /conf/jetty.xml 文件,打到 bean id = "securityConstraint" 项
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="user,admin" /> <!-- set authenticate=false to disable login --> <property name="authenticate" value="true" /> </bean> <bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="admin" /> <!-- set authenticate=false to disable login --> <property name="authenticate" value="true" /> </bean>
上面默认配置,有两个角色,分别是:user 和 admin,authenticate 属性值为 true 表示开启登录认证,若要关闭登录认证则修改值为 false。可以只对 admin 角色定制化是否开关登录认证。
修改登录密码,打开 /conf/jetty-realm.properties 文件,默认内容如下,修改密码字段的内容。
# username: password [,rolename ...] # 用户名: 密码 [, 角色名] admin: 123456, admin user: user, user
配置 TCP 连接 ActiveMQ 密码,供 Spring Boot 集成访问
默认的 TCP 连接端口是 61616。端口号配置在 /conf/activemq.xml 文件中,连接器名为 openwire,uri 以 tcp 开头的配置项,如下,可修改端口号。
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
修改 conf 目录中 credentials.properties 文件进行密码设置,此配置被 activemq.xml 引用。
activemq.username=root activemq.password=123456 guest.password=123456
也可以直接使用 AcitveMQ 的 Docker 镜像,做好端口映射;省略手动安装 AcitveMQ 服务。
AcitveMQ 集成
添加依赖
pom.xml 文件添加依赖。如果要配置连接池,必须添加activemq-pool
依赖,否则报错:找不到 JmsTemplate Bean
<!--ActiveMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq-pool-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
连接配置
application.properties 配置文件添加配置 ActiveMQ 连接配置。
Spring Boot 自动配置默认开启的消息模式是Queue
队列点对点模式;如果要使用Topic
发布/订阅模式(Pub/Sub),则将 spring.jms.pub-sub-domain=
改为true
。
#---------Spring JMS-----------------------
##----默认为false,queue(点对点)模式; 修为true,则是topic(发布/订阅模式)
#spring.jms.pub-sub-domain=false
#---------ActiveMQ-------------------------
spring.activemq.broker-url=tcp://10.0.3.4:61616
spring.activemq.user=root
spring.activemq.password=123456
spring.activemq.in-memory=false
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=10
spring.activemq.pool.idle-timeout=30
Topic 配置
ActiveMQ 配置类
自动配置的方式不支持Queue
和Topic
同时使用,若在一个项目里要同时使用这两种模式,则需要自定义一个 JmsListenerContainerFactory
Bean,设置 pub-sub-domain
为true
,Topic
监听注解添加containerFactory
属性,指向自定义开启Topic的 JmsListenerContainerFactory。
/**
* @name: ActiveMQConfig
* @desc: 配置类
**/
@Configuration
public class ActiveMQConfig {
@Bean
public JmsListenerContainerFactory<?> topicListenerContainer(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory topicListenerContainer = new DefaultJmsListenerContainerFactory();
topicListenerContainer.setPubSubDomain(true);
topicListenerContainer.setConnectionFactory(activeMQConnectionFactory);
return topicListenerContainer;
}
}
消息生产者
消息生产者:producer
/**
* @name: MQSendServiceImpl
* @desc: TODO
**/
@Service
public class MQProducerServiceImpl implements MQProducerService {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 发送queue消息
*
* @param msg
* @throws JMSException
*/
@Override
public void activeMQSend(String msg) throws JMSException {
MessageCreator messageCreator = session -> session.createTextMessage(msg);
//发布queue
Destination queueDestination = new ActiveMQQueue("my-queue");
jmsTemplate.send(queueDestination, messageCreator);
jmsTemplate.convertAndSend(queueDestination, "Hello Queue");
//发布topic
Destination topicDestination = new ActiveMQTopic("my-topic");
jmsTemplate.send(topicDestination, messageCreator);
jmsTemplate.convertAndSend(queueDestination, "Hello Topic");
}
}
消息消费者
消费者监听消息:consumer
/**
* @name: MQConsumerServiceImpl
* @desc: TODO
**/
@Service
public class MQConsumerServiceImpl implements MQConsumerService {
/**
* 监听queue消息
*
* @param message
*/
@Override
@JmsListener(destination = "my-queue")
public void activeMQQueueReceive(String message) {
System.out.println("监听收到my-queue消息:" + message);
}
/**
* 监听topic消息
* 一个项目同时使用 Queue 和 Topic, Topic 监听注解添加`containerFactory`属性,
* 指向自定义开启**Topic**的 **JmsListenerContainerFactory**。
* @param message
*/
@Override
@JmsListener(destination = "my-topic", containerFactory = "topicListenerContainer")
public void activeMQTopicReceive(String message) {
System.out.println("监听收到my-topic消息:" + message);
}
}
参考资料
注意:本文归作者所有,未经作者允许,不得转载