今天来讲讲如何使用activeMq消息队列。
示例实现:queue,双向队列、发布/订阅、参数设置

整体结构

imagepng
doublequeue包下:双向队列使用;
queue包下:队列的使用;
topic包下:topic的使用;
config包下:mq的基础配置;

pom配置

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
	<groupId>org.messaginghub</groupId>
	<artifactId>pooled-jms</artifactId>
	<version>1.0.3</version>
</dependency>

核心代码

ChapterActivemqApplication类中加入

@EnableJms

注解

ActiveMqConfig为配置类,设置了queue和topic的名称,设置订阅消息,设置重试策略,设置JmsTemplate。

Producer中,使用jmsTemplate.convertAndSend方法来发送消息,当然你也可以使用它的其他方法。
Consumer中,在方法上添加注解,@JmsListener(destination = Constants.DEMO_QUEUE) ,表示这个方法监听这个队列,有消息发送过来,就可以监听到,然后进行处理。
这个方法里面还可以手动处理是否收到消息,是否需要重试等。
示例如:

@JmsListener(destination = Constants.DEMO_TOPIC, containerFactory = "topicContainerFactory")
public void receiveTopic(TextMessage text, Session session) throws Exception{
    try{
        log.info("======{}====={}======",text,session);
  //使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
  text.acknowledge();
  }catch(Exception e){
        // 此不可省略 重发信息使用
  session.recover();
  }
}

配置

application.properties配置

spring.activemq.broker-url=tcp://ip:61616
#消息持久化
spring.activemq.in-memory=false
spring.activemq.user=admin
spring.activemq.password=admin
spring.jms.cache.session-cache-size=5
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50

源码地址

本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点

注意:本文归作者所有,未经作者允许,不得转载