今天来讲讲如何使用activeMq消息队列。
示例实现:queue,双向队列、发布/订阅、参数设置
整体结构
doublequeue包下:双向队列使用;
queue包下:队列的使用;
topic包下:topic的使用;
config包下:mq的基础配置;
pom配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.0.3</version>
</dependency>
核心代码
ChapterActivemqApplication类中加入
@EnableJms
注解
ActiveMqConfig为配置类,设置了queue和topic的名称,设置订阅消息,设置重试策略,设置JmsTemplate。
Producer中,使用jmsTemplate.convertAndSend方法来发送消息,当然你也可以使用它的其他方法。
Consumer中,在方法上添加注解,@JmsListener(destination = Constants.DEMO_QUEUE) ,表示这个方法监听这个队列,有消息发送过来,就可以监听到,然后进行处理。
这个方法里面还可以手动处理是否收到消息,是否需要重试等。
示例如:
@JmsListener(destination = Constants.DEMO_TOPIC, containerFactory = "topicContainerFactory")
public void receiveTopic(TextMessage text, Session session) throws Exception{
try{
log.info("======{}====={}======",text,session);
//使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
text.acknowledge();
}catch(Exception e){
// 此不可省略 重发信息使用
session.recover();
}
}
配置
application.properties配置
spring.activemq.broker-url=tcp://ip:61616
#消息持久化
spring.activemq.in-memory=false
spring.activemq.user=admin
spring.activemq.password=admin
spring.jms.cache.session-cache-size=5
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载