Spring AMQP 默认支持 RabbitMQ 作为 AMQP 协议的实现,因为...RabbitMQ 和 Spring 是同一家软件公司开发的。
Spring Boot 对 RabbitMQ 的支持也是基于 Spring AMQP。为 RabbitMQ 提供了自动配置,可以直接使用rabbitTemplate
,自动开启了消息监听注解@EnableRabbit
。
更多关于消息服务概念和支持的组件可阅读Spring Boot 2实践系列(三十三):JMS 和 AMQP 消息服务及支持的消息组件。
RabbitMQ 简介
RabbitMQ 是一个基于 AMQP 协议的轻量级,可靠,可扩展且可移植的消息代理; 支持多种消息传输协议、消息队列、传输确认、到队列的灵活路由、多种交换类型等。
支持多种语言开发的客户端,可集群部署以实现高可用和大吞吐量。是部署最广泛、最受欢迎的开源消息代理【- -译自官方描述】。
Spring AMQP 参考,RabbitMQ 官网,RabbitMQ Download,RabbitMQ Doc,RabbitMQ Tutorials, Management Plugin。
RabbitMQ 安装
以 Ubuntu 18.x 安装 RabbitMQ 为例,根据官方安装说明来执行安装操作。
下载
下载软件包前先在服务器添加用于签署 RabbitMQ 版本的密钥,否则会报警告
apt-key adv --keyserver "hkps.pool.sks-keyservers.net" --recv-keys "0x6B73A36E6026DFCA" wget -O - "https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc" | sudo apt-key add -
RabbitMQ 依赖面向高并发的语言 Erlang,所以需要安装 Erlang 运行环境。
官方有提供安装 Erlang 运行环境的安装步骤。但个人不建议手动安装 Erlang 运行环境,因为后续安装完 RabbitMQ 后,各种依赖和版本冲突问题,也不好定位,rabbitmq-server 无法启动,所以建议下载 RabbitMQ 全量包(rabbitmq-server_3.7.8-1_all.deb),会自动下载 RabbitMQ 和所有依赖。下载 RabbitMQ 全量包,以rabbitmq-server_3.7.8-1_all.deb为例。
执行安装:
dpkg -i rabbitmq-server_3.7.8-1_all.deb会报缺少依赖,执行全量安装,会自动下载所有依赖并安装
apt-get install -f
安装
安装完后,rabbitmq-server 服务会自动运行, 查看 rabbitmq-server 进程和状态
ps -ef|grep rabbitmq
service rabbitmq-server status //查看状态,显示 Active 表示活动运行中。服务启动、停止、重启、查看状态操作
service rabbitmq-server start
service rabbitmq-server status
service rabbitmq-server restart
service rabbitmq-server stop查看 rabbitmq 状态,包括当前的配置信息
rabbitmqctl status安装 Rabbitmq 管理插件,提供基于 HTTP API 对 Rabbitmq 节点和集群进行管理和监控。管理插件包括基于浏览器的 UI 和命令行工具 rabbitmqadmin。
启用管理插件
rabbitmq-plugins enable rabbitmq_management
Web 管理
登录 Rabbitmq Web 管理
Web 管理的端口是 15672,为本地登录提供了guest/guest
账号密码,若要远程登录管理,需要添加账号。添加个 admin 账号用于远程登录
添加账号密码
rabbitmqctl add_user admin 123456查看添加的账号
rabbitmqctl list_users添加权限为管理员,管理员账号可用于 Spring Boot 的连接
rabbitmqctl set_user_tags admin administrator分配权限(赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
关于 rabbitmqctl 命令的使用,可使用 man 命令来查看
man rabbitmqctl
也可以直接使用 Rabbitmq 的 Docker 镜像,做好端口映射;省略手动安装 Rabbitmq 服务。
RabbitMQ 集成
RabbitMQ 的具体使用可参考官方的RabbitMQ Tutorials。RabbitMQ 通信有六种模式: Simplest(一对一的简单队列模式)、Work queues(一对多的队列模式)、Publish/Subscribe(发布-订阅模式)、Routing(路由模式)、Topics(主题模式)、RPC(RPC调用模式)。
此集成演示最常见的Queue
队列模式和Topic
主题模式。RabbitMQ的功能非常强大,可以单独开个系列文章来分析和实践。
基本配置
前提是 RabbitMQ 服务器正常运行中。
pom.xml 添加依赖
<!--AMQP--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.properties 配置
#---------RabbitMQ------------------------- spring.rabbitmq.host=192.168.220.128 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/
Queues
添加 Queue Bean 配置
/** * @name: MQComponent * @desc: 消息组件 **/ @Configuration public class QueueRabbitMQConfig { private static final String QUEUE_NAME_1 = "my-queue"; private static final String QUEUE_NAME_2 = "object"; /** * 声明队列 * * @return */ @Bean public Queue strQueue() { return new Queue(QUEUE_NAME_1); } /** * 声明队列 * * @return */ @Bean public Queue objQueue() { return new Queue(QUEUE_NAME_2); } /** * 声明交换器 * * @return */ @Bean public DirectExchange directExchange() { DirectExchange directExchange = new DirectExchange("direct.exchange", false, true); return directExchange; } /** * 队列绑定交换器 * * @param strQueue * @param directExchange * @return */ @Bean public Binding strQueueBind(Queue strQueue, DirectExchange directExchange) { //使用队列名作为路由 Binding binding = BindingBuilder.bind(strQueue).to(directExchange).withQueueName(); return binding; } @Bean public Binding objQueueBind(Queue objQueue, DirectExchange directExchange) { //使用队列名作为路由 Binding binding = BindingBuilder.bind(objQueue).to(directExchange).withQueueName(); return binding; } }
消息生产者
/** * @name: MQSendServiceImpl **/ @Service public class QueueProducerServiceImpl implements QueueProducerService { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送普通消息到队列-queue */ @Override public void rabbitMQSendStr(String msg) throws InterruptedException { for (int i = 0; i < 100; i++) { rabbitTemplate.convertAndSend("my-queue", System.nanoTime() + "__" + i); System.out.println(i); Thread.sleep(1000); } } /** * 发送对象,对象必须实现序列化接口 */ @Override public void rabbitMQSendObj(User user) { this.rabbitTemplate.convertAndSend("object", user); } }
消息消费者
/** * @name: MQConsumerServiceImpl * @desc: 消息消费者 **/ @Service public class QueueConsumerServiceImpl { /** * 多个消费者订阅 queue 类型同一个渠道的消息, * 接收消息的顺序与发送消息的顺序并不相同, * 多个消费者平均消费消息。 */ /** * 监听my-queue 文本消息 */ @RabbitListener(queues = "my-queue") public void rabbitMQReceive1(String msg) { System.out.println("client1 receive my-queue msg:" + msg); } @RabbitListener(queues = "my-queue") public void rabbitMQReceive2(String msg) { System.out.println("client2 receive my-queue msg:" + msg); } @RabbitListener(queues = "my-queue") public void rabbitMQReceive3(String msg) { System.out.println("client3 receive my-queue msg:" + msg); } /** * 监听user 对象消息消息 */ @RabbitListener(queues = "object") public void rabbitMQReceiveUser(User user) { System.out.println("receive user msg:" + user); } }
多个消费者订阅 queue
类型同一个渠道的消息,多个消费者平均消费消息(轮询),接收消息的顺序与发送消息的顺序并不相同。
Spring Boot 集成 RabbitMQ,只要对象实现了序列化接口,可直接发布对象消息。
Topics
Topic主题模式在消息生产者和消息消费者之间增加了Exchange
(交换器),将消息生产者和消息消费者进行了解耦,消息生产者将消息发送给交换器,交换器根据调度策略把消息再给到目标队列。
Exchange
用于转发消息,不做存储。若没有将 Queue bind 到 Exchange,交换器将直接丢弃消息,若开启了 ack 模式,在找不到队列时会返回错误。
将 Queue 绑定到 Exchange ,涉及三个参数的关联,分别是queue、exchange、routing_key,如BindingBuilder.bind(queueNewsNba).to(exchange).with("topic.news.nba")
。
Queue 绑定到 Exchang 状态可在 Web 管理界面 Queues 选项点击队列名称,在队列概述界面有个 Bindings 项,点击展开可查看到当前 queue 绑定到的路由键。
routing_key:路由密钥由点分隔的多个单词组成,最多可达 255 个字节。例:news.nba,sys.log.error等。同样 queue 名也采用相同的形式。此路由键支持两种模式匹配:
*(start)可匹配一个单词。
#(hash)可替换零个或多个单词。
Topic 配置(注册 Queue,TopicExchange,Binding)
/** * @name: TopicRabbitMQConfig * @desc: Topic 配置 **/ @Configuration public class TopicRabbitMQConfig { //新闻主题 private static final String TOPIC_NEWS = "topic.news"; //NBA新闻主题 private static final String TOPIC_NEWS_NBA = "topic.news.nba"; @Bean public Queue queueNews() { return new Queue(TopicRabbitMQConfig.TOPIC_NEWS); } @Bean public Queue queueNewsNba() { return new Queue(TopicRabbitMQConfig.TOPIC_NEWS_NBA); } /** * 注册交换器 * * @return */ @Bean public TopicExchange exchange() { TopicExchange exchange = new TopicExchange("exchange"); return exchange; } /** * 队列绑定到交换器,设置匹配的routing_key * / /** * 订阅新闻主题可以收到所有新闻包括NBA * @param queueNews * @param exchange * @return */ @Bean public Binding bindingExchangeNews(Queue queueNews, TopicExchange exchange) { return BindingBuilder.bind(queueNews).to(exchange).with("topic.news.#"); } /** * 订阅新闻下的NBA主题只可以收到NBA新闻 * @param queueNewsNba * @param exchange * @return */ @Bean public Binding bindingExchangeNewsNba(Queue queueNewsNba, TopicExchange exchange) { return BindingBuilder.bind(queueNewsNba).to(exchange).with("topic.news.nba"); } }
消息生产者
/** * @name: TopicProducerServiceImpl * @desc: Topic 生产者 */ @Service public class TopicProducerServiceImpl implements TopicProducerService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private TopicExchange topicExchange; @Override public void rabbitMQSendStr(String msg) { rabbitTemplate.convertAndSend(topicExchange.getName(), "topic.news", "Top Ten News"); } @Override public void rabbitMQSendObj(User andy) { rabbitTemplate.convertAndSend(topicExchange.getName(), "topic.news.nba", "2018-2019 NBA First Battle Start"); } }
消息消费者
/** * @name: TopicConsumerServiceImpl * @desc: Topic 消费者 **/ @Service public class TopicConsumerServiceImpl { @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = "topic.news") public void rabbitMQReceive1(String msg) { System.out.println("client1 receive topic.news msg: " + msg); } @RabbitListener(queues = "topic.news.nba") public void rabbitMQReceive2(String msg) { System.out.println("client2 receive topic.news.nba msg: " + msg); } }
注意:本文归作者所有,未经作者允许,不得转载