在微服务分布式架构中,通常多个服务需要订阅同一个消息主题来做一些相同的操作,例如配置更新等。对于这类需求,通常会使用轻量级的消息代理来构建一个共用的消息主题,让微服务实例连接上来,该主题中的消息会被所有订阅的实例消费,所以称之为 消息总线。
Spring Cloud Bus 使用轻量级消息代理连接分布式系统的节点。 此代理也可用于广播状态更改(例如:配置更改)或其它和管理指令,也可以用作应用程序之间的通信通道。Spring Cloud Bus 为 AMQP 或 Kafka 作为消息代理提供了 starter 支持。
Spring Cloud Bus 官方文档,GitHub > Spring Cloud Bus。
Bus 介绍
Spring Cloud Bus 如果在类路径上检测到自己( spring-cloud-bus ),就会通过添加 Spring Boot 自动配置来工作。
要启用 Bus,需要添加 spring-cloud-starter-bus-amqp 或 spring-cloud-starter-bus-kafka 依赖。确保消息代理(RabbitMQ 或 Kafka)可用可配置,Spring Cloud 会负责余下部分。
Bus 支持向所有的监听节点或指定服务(由 Euresk 定义)的所有节点发送消息。
/bus/* actuator 有名称空间有一个 HTTP 端点,目前有两项是已实现的:
- /bus/env:发送 key/value(键值对)来更新每个节点的 Spring 环境。
- /bus/refresh:重新加载每个应用程序的配置,就好像对应用的 /refresh 端点发了请求。
Bus 端点
Spring Cloud Bus 提供两个端点,/actuator/bus-refresh 和 /actuator/bus-env,分别对应 Spring Cloud Commons 中的 /actuator/refresh 和 /actuator/env 的 actuator 端点。
Bus Refresh 端点
/actuator/bus-refresh 端点会清空 RefreshScope 缓存,重新绑定 @ConfigurationProperties。更多请参见 Refresh Scope。
要暴露 /actuator/bus-refresh 端点,需在配置文件中添加如下设置:
application.properties
management.endpoints.web.exposure.include=bus-refresh
Bus Env 端点
/actuator/bus-env 端点会在多个实例中使用指定的 key/value(键/值)对更新每个实例的环境。
要暴露 /actuator/bus-env 端点,需在配置文件中添加如下设置:
management.endpoints.web.exposure.include=bus-env
/actuator/bus-env 端点接收如下形式的 POST 请求:
{
"name": "key1",
"value": "value1"
}
实例寻址
寻址定义
每个应用实例都有一个 service Id,该值可通过 spring.cloud.bus.id 设置,值是以冒号分隔的字符列表,从大类到具体。
默认值是从 environment 中构建,是 spring.application.name 和 server.port (或者 spring.application.index 如果有设置)的组合。默认值的构建格式是 app:index:id,如下:
- app:vcap.application.name 如果有设置,或者 spring.application.name。
- index:vcap.application.instance_index 如果有设置,或者是 spring.application.index,或者 local.server.port, 或是 server.port, 或是 0 。
- id:vcap.application.instance_id 如果有设置,或者是随机者。
HTTP 端点接受目标路径参数 /bus-refresh/{destination},例如 /bus-refresh/customer:9000 ,customer:9000 是 service Id。如果该 ID 由总线上的实例拥有,则此实例处理消息,其他实例忽略此消息。
寻址匹配
HTTP 端点接受的目标路径参数支持模式匹配,可以使用 Spring PathMatcher(路径分隔符为冒号 :),用于确定某些实例是否处理该消息。
前面示例:/bus-env/customers:**
对于 customers 的所有实例有效,而不考虑 service Id 的其余部分。
服务ID 唯一
Bus 会尝试两次清除对事件(Event)的处理,一次从原始的 ApplicationEvent,一次从队列。为此,它将概据当前服务ID 检查发送服务ID,如果多个实例具有相同的 ID,则不会处理事件,如果多个实例是在同一台机器上运行,则每个实例都在不同的端口上,端口是 ID 的一部分。
Cloud Foundry 提供了一个索引来区分,若没有使用 Cloud Foundry 而要确保唯一,需为每个服务的实例设置唯一的 spring.application.index 。
消息代理
Spring Cloud Bus 使用 Spring Cloud Stream 来广播消息。 因此,要使消息流动,只需要在类路径中包含所选择的绑定器实现。AMQP(RabbitMQ)和 Kafka(spring-cloud-starter-bus- [amqp | kafka])为 Bus 提供了非常方便的 starts 组件。
一般来说,Spring Cloud Stream 依赖于Spring Boot 自动配置约定来配置中间件。例如,可以使用 spring.rabbitmq.*
属性来配置 AMQP 代理地址。
Spring Cloud Bus 在 spring.cloud.bus.*
中有一些原生配置属性(例如,spring.cloud.bus.destination 是用作外部中间件的主题的名称)。 通常,默认值就足够了。
要了解有关如何自定义消息代理设置的更多信息,请参阅 Spring Cloud Stream 文档。
Bus 事件
跟踪 Bus 事件
Bus events(RemoteApplicationEvent 的子类) 通过设置 spring.cloud.bus.trace.enabled=true 来进行跟踪。如果这样做了,Spring Boot TraceRepository(如果存在) 会显示发送的每个事件以及来自每个服务实例的所有 ACK。
以下示例来自 /trace 端点:
{
"timestamp": "2015-11-26T10:24:44.411+0000",
"info": {
"signal": "spring.cloud.bus.ack",
"type": "RefreshRemoteApplicationEvent",
"id": "c4d374b7-58ea-4928-a312-31984def293b",
"origin": "stores:8081",
"destination": "*:**"
}
},
{
"timestamp": "2015-11-26T10:24:41.864+0000",
"info": {
"signal": "spring.cloud.bus.sent",
"type": "RefreshRemoteApplicationEvent",
"id": "c4d374b7-58ea-4928-a312-31984def293b",
"origin": "customers:9000",
"destination": "*:**"
}
},
{
"timestamp": "2015-11-26T10:24:41.862+0000",
"info": {
"signal": "spring.cloud.bus.ack",
"type": "RefreshRemoteApplicationEvent",
"id": "c4d374b7-58ea-4928-a312-31984def293b",
"origin": "customers:9000",
"destination": "*:**"
}
}
上面示例显示了一个从 customer:9000 发送的 RefreshRemoteApplicationEvent 事件,广播给所有服务,并由 customer:9000 和 stores:8081 接收(acked:确认)。
若要自己处理 ACK 信号,可以为应用程序添加 AckRemoteApplicationEvent 和 SentApplicationEvent 类型的 @EventListener(并启用跟踪)。或者,利用 TraceRepository 从数据中挖掘信息。
广播自定义事件
Bus 可以承载 RemoteApplicationEvent 类型的任何事件。 默认传输是 JSON,反序列化时需要提前知道将使用哪些类型。 要注册新类型,必须将其放在 org.springframework.cloud.bus.event 的子包中。
要自定义事件名称,可以在自定义类上使用 @JsonTypeName,或依赖默认策略,即使用类的简单名称。
自定义包中注册事件
如果不想或不能使用 org.springframework.cloud.bus.event 的子包下自定义事件,就必须使用 @RemoteApplicationEventScan 注解并指定扫描 RemoteApplicationEvent 类型的事件所在的包。@RemoteApplicationEventScan 指定的包包含子包。
如下示例,自定义了 MyEvent 事件:
package com.acme;
public class MyEvent extends RemoteApplicationEvent {
...
}
可以通过以下方式向反序列化器注册该事件:
package com.acme;
@Configuration
@RemoteApplicationEventScan
public class BusConfiguration {
...
}
@RemoteApplicationEventScan 注解在不指定值的情况下,将使用其所在类的包,示例中,使用 BusConfiguration 包注册 com.acme。
还可以确切地为 @RemoteApplicationEventScan 注解使用 value,basePackages 或 basePackageClasses 属性来指定包。如下示例:
package com.acme;
@Configuration
//@RemoteApplicationEventScan({"com.acme", "foo.bar"})
//@RemoteApplicationEventScan(basePackages = {"com.acme", "foo.bar", "fizz.buzz"})
@RemoteApplicationEventScan(basePackageClasses = BusConfiguration.class)
public class BusConfiguration {
...
}
整合 Spring Cloud Bus
Spring Cloud Bus 是一个中间组件,被整合到其它项目中使用。这里以 Spring Cloud Config Client 项目整合 Bus 为例,实现集群实例自动更新配置。
Config Server 和 Client 项目使用之前的项目(Spring Cloud系列(十):分布式配置管理 Config 之 Git 实现)。
RabbitMq
使用 RabbitMq 作为 Spring Cloud Bus 的消息代理,需要 RabbitMq 服务,如果没有则需要安装。
涉及 RabbitMq 安装配置相关操作可参考文章 Spring Cloud系列(十三):分布式服务链路跟踪 Sleuth 中关于 RabbitMq 描述,或可参考文章 Spring Boot 2实践系列(三十五):集成 RabbitMQ 消息中间件 中关于 RabbitMq 的安装和与集成。
添加依赖
Config Client 项目(例子中的项目是 orderService) 添加 spring-cloud-starter-bus-amqp 依赖,该依赖包含了 spring-cloud-starter-stream-rabbit 、spring-cloud-stream、spring-boot-starter-amqp 等依赖。
注意:spring-boot-starter-actuator 依赖不可缺,HTTP 端点的暴露依赖此组件,在这里提供刷新端点。
pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
连接 RabbitMq
Config Client 项目(例子中的项目是 orderService) 的引导配置文件添加连接 RabbitMq Server 的配置。
bootstrap.properties
# Rabbitmq
spring.rabbitmq.host=10.0.3.6
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
# 暴露端点
management.endpoints.web.exposure.include=bus-refresh,bus-env
启动服务
启动 Config Server 项目服务。
远程配置文件设置自定义属性
common.properties.appId=ABCDEFG123456 common.properties.secret=qwertyuiop common.properties.key=112233
启动 Config Client 项目启动两个实例,端口分别是 8010 和 8011,项目中有个 /config/property 接口来获取远程配置文件中的属性值。
项目中有个 CommonProperties 属性实体类,使用了 @ConfigurationProperties 注解来注入配置文件中属性的值,使用了 @RefreshScope 注解来自动刷新属性值。
CommonProperties.java@Configuration @ConfigurationProperties(prefix = "common.properties") @RefreshScope public class CommonProperties { private String appId; private String secret; private String key; //--省略 set/get/toString 方法 }
PropertyController.java
@RestController @RequestMapping("/config") public class PropertyController { @Autowired private CommonProperties commonProperties; @Autowired private Environment environment; @GetMapping("/property") public String refreshProperties(){ String envAppId = environment.getProperty("common.properties.app-id"); return commonProperties + "-----" + envAppId; } }
打开 RabbitMq Web 控制台,查看连接和队列
启动了两个 Config Client 实例,在 Connection 项里可以看到多了两个连接,在 Queues 项里看到多个两个以 springCloudBus 开头的队列名称。
更新配置
分别向端口 8010 和 8011 两个实例的接口(/config/property) 发送请求,返回远程配置文件中属性的值。
修改远程配置文件中属性的值,例如修改:common.properties.appId=HELLO CHINA
将其中一个实例(8010) 的 Bus 刷新端点发送请求:http://localhost:8010/actuator/bus-refresh
查看 8010 和 8011 两个实例的控制台打印输出,可以看到实例重新拉取并加载了远程配置文件。
执行第一步,再次向两个实例的接口发送请求,可以看到都返回的是更新后的值。
即请求的是 8010 实例的刷新端点,8011 实例会同步刷新配置。
架构优化
前面的所有操作都是在 Config Client 端执行,请求刷新端口也是发送给每一个 Config Client 实例,在实例生产环境中,这个 Config Client 实例就存在不确定性,就不同于集群中的其它实例,这样是不利于运维。
做如下优化改动,让集群中的各个实例是对等的。
- 在 Config Server 端引入 Spring Cloud Bus,将配置服务器加入到消息总线中。
- 刷新端点(/actuator/bus-refresh) 请求不再发送给某个具体实例,而是发送给 Config Server,并通过路径参数( /bus-refresh/{destination} )指定需要更新的服务或实例。
通过如上改动后,应用实例不再负责触发配置更新的职责,只需针对 Config Server 即可触发服务的所有实例或指定的实例更新配置。
注意:本文归作者所有,未经作者允许,不得转载