SpringBoot2实践系列(三十四):集成AcitveMQ消息中间件

star2017 1年前 ⋅ 336 阅读

  Spring Boot 为 AcitveMQ 提供了自动配置,可以直接使用jmsTemplate,自动开启了消息监听注解@EnableJms

  更多关于消息服务概念和支持的组件可阅读Spring Boot 2实践系列(三十三):JMS 和 AMQP 消息服务及支持的消息组件

ActiveMQ 简介

ActiveMQ 速度快,支持Java,C,C ++,C#,Ruby,Perl,Python,PHP等多种跨语言客户端和协议。 具有易于使用的企业集成模式和许多高级特性,同时完全支持JMS 1.1 和 J2EE 1.4,支持 AMQP v1.0协议,支持 MQTT v3.1允许在物联网环境中连接。

ActiveMQ 可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置,Spring Boot 为 ActiveMQ 提供了自动配置。

Apache ActiveMQ 官网, Download, Getting Started,Using Apache ActiveMQ

AcitveMQ 安装

  1. 从官网下载软件包,这里以 Linux 系统为例。
    在 Windows 下载 ActiveMQ 软件包上传到 Linux,或 Linux 服务器直接下载(前提是可连外网) 。
  2. 解压软件包

    tar zxvf activemq-x.x.x-bin.tar.gz

    以 AcitveMQ v5.15.6 为例
    tar zxvf apache-activemq-5.15.6-bin.tar.gz

  3. 运行 AcitveMQ
    进入 AcitveMQ 解压目录,启动 AcitveMQ

    cd :/usr/local/apache-activemq-5.15.6/bin/
    ./activemq start

    若出现无法启动,查看日志:

    cd apache-activemq-5.15.6/data/
    cat activemq.log

    查看最近的异常信息,若出现如下异常:

    Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed; nested exception is java.net.URISyntaxException: Illegal character in hostname at index 9: ws://dev_linux:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 | org.apache.activemq.xbean.XBeanBrokerFactory$1 | main
    这里面的 dev_linux 服务器名,在 activemq.xml 配置文件里默认配置的是 0.0.0.0 的IP地址,估计是ws协议不支持这种转换。

    修改配置文件:activemq.xml

    cd apache-activemq-5.15.6/conf/
    vim activemq.xml
    搜索: /ws://0.0.0.0
    修改 ws://0.0.0.0 为 ws://127.0.0.1/ 保存

    再启动

  4. AcitveMQ 启停命令

    # AcitveMQ 启停命令
    ./activemq tstart | stop | restart
    
    # 查看 AcitveMQ 相关信息和命令
    ./activemq 不带任何参数
    
  5. AcitveMQ 自带了 Web 管理端,通过浏览器访问,Web 访问端口默认是 8162

    Web 页面是基于 JSP 实现,Servlet 容器是 Jetty ,Jetty 配置文件在 /conf/jetty.xml 中。

    浏览器输入:http://ip:8162 , 默认账号密码是 admin/admin

    若要修改端口号,打开 /conf/jetty.xml 文件,找到 bean 的 id="jettyPort " 项,修改 port 属性

    <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
        <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8161"/>
    </bean>
    
  6. Web 页面登录密码

    AcitveMQ 自带的 Web 管理页面的默认开启了登录认证,默认账号密码是:admin / admin

    若要修改登记录认配置,打开 /conf/jetty.xml 文件,打到 bean id = "securityConstraint" 项

    <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="user,admin" />
        <!-- set authenticate=false to disable login -->
        <property name="authenticate" value="true" />
    </bean>
    <bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="admin" />
        <!-- set authenticate=false to disable login -->
        <property name="authenticate" value="true" />
    </bean>
    

    上面默认配置,有两个角色,分别是:user 和 admin,authenticate 属性值为 true 表示开启登录认证,若要关闭登录认证则修改值为 false。可以只对 admin 角色定制化是否开关登录认证。

    修改登录密码,打开 /conf/jetty-realm.properties 文件,默认内容如下,修改密码字段的内容。

    # username: password [,rolename ...]
    # 用户名: 密码 [, 角色名]
    admin: 123456, admin
    user: user, user
    
  7. 配置 TCP 连接 ActiveMQ 密码,供 Spring Boot 集成访问

    默认的 TCP 连接端口是 61616。端口号配置在 /conf/activemq.xml 文件中,连接器名为 openwire,uri 以 tcp 开头的配置项,如下,可修改端口号。

    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>
    

    修改 conf 目录中 credentials.properties 文件进行密码设置,此配置被 activemq.xml 引用。

    activemq.username=root
    activemq.password=123456
    guest.password=123456
    
  8. 也可以直接使用 AcitveMQ 的 Docker 镜像,做好端口映射;省略手动安装 AcitveMQ 服务。

AcitveMQ 集成

添加依赖

pom.xml 文件添加依赖。如果要配置连接池,必须添加activemq-pool依赖,否则报错:找不到 JmsTemplate Bean

<!--ActiveMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq-pool-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

连接配置

application.properties 配置文件添加配置 ActiveMQ 连接配置。
Spring Boot 自动配置默认开启的消息模式是Queue队列点对点模式;如果要使用Topic发布/订阅模式(Pub/Sub),则将 spring.jms.pub-sub-domain=改为true

#---------Spring JMS-----------------------
##----默认为false,queue(点对点)模式; 修为true,则是topic(发布/订阅模式)
#spring.jms.pub-sub-domain=false
#---------ActiveMQ-------------------------
spring.activemq.broker-url=tcp://10.0.3.4:61616
spring.activemq.user=root
spring.activemq.password=123456
spring.activemq.in-memory=false
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=10
spring.activemq.pool.idle-timeout=30

Topic 配置

ActiveMQ 配置类
自动配置的方式不支持QueueTopic同时使用,若在一个项目里要同时使用这两种模式,则需要自定义一个 JmsListenerContainerFactory Bean,设置 pub-sub-domaintrueTopic监听注解添加containerFactory属性,指向自定义开启TopicJmsListenerContainerFactory

/**
 * @name: ActiveMQConfig
 * @desc: 配置类
 **/
@Configuration
public class ActiveMQConfig {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerContainer(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory topicListenerContainer = new DefaultJmsListenerContainerFactory();
        topicListenerContainer.setPubSubDomain(true);
        topicListenerContainer.setConnectionFactory(activeMQConnectionFactory);
        return topicListenerContainer;
    }
}

消息生产者

消息生产者:producer

/**
 * @name: MQSendServiceImpl
 * @desc: TODO
 **/
@Service
public class MQProducerServiceImpl implements MQProducerService {

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * 发送queue消息
     *
     * @param msg
     * @throws JMSException
     */
    @Override
    public void activeMQSend(String msg) throws JMSException {

        MessageCreator messageCreator = session -> session.createTextMessage(msg);

        //发布queue
        Destination queueDestination = new ActiveMQQueue("my-queue");
        jmsTemplate.send(queueDestination, messageCreator);
        jmsTemplate.convertAndSend(queueDestination, "Hello Queue");

        //发布topic
        Destination topicDestination = new ActiveMQTopic("my-topic");
        jmsTemplate.send(topicDestination, messageCreator);
        jmsTemplate.convertAndSend(queueDestination, "Hello Topic");

    }
}

消息消费者

消费者监听消息:consumer

/**
 * @name: MQConsumerServiceImpl
 * @desc: TODO
 **/
@Service
public class MQConsumerServiceImpl implements MQConsumerService {

    /**
     * 监听queue消息
     * 
     * @param message
     */
    @Override
    @JmsListener(destination = "my-queue")
    public void activeMQQueueReceive(String message) {
        System.out.println("监听收到my-queue消息:" + message);
    }

    /**
     * 监听topic消息
     * 一个项目同时使用 Queue 和 Topic, Topic 监听注解添加`containerFactory`属性,
     * 指向自定义开启**Topic**的 **JmsListenerContainerFactory**。
     * @param message
     */
    @Override
    @JmsListener(destination = "my-topic", containerFactory = "topicListenerContainer")
    public void activeMQTopicReceive(String message) {
        System.out.println("监听收到my-topic消息:" + message);
    }
}

参考资料

  1. 快速搭建ActiveMQ服务-Docker方式
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: