SpringCloud系列(十五):消息总线之SpringCloudBus

star2017 1年前 ⋅ 455 阅读

在微服务分布式架构中,通常多个服务需要订阅同一个消息主题来做一些相同的操作,例如配置更新等。对于这类需求,通常会使用轻量级的消息代理来构建一个共用的消息主题,让微服务实例连接上来,该主题中的消息会被所有订阅的实例消费,所以称之为 消息总线

Spring Cloud Bus 使用轻量级消息代理连接分布式系统的节点。 此代理也可用于广播状态更改(例如:配置更改)或其它和管理指令,也可以用作应用程序之间的通信通道。Spring Cloud Bus 为 AMQP 或 Kafka 作为消息代理提供了 starter 支持。

Spring Cloud Bus 官方文档GitHub > Spring Cloud Bus

Bus 介绍

Spring Cloud Bus 如果在类路径上检测到自己( spring-cloud-bus ),就会通过添加 Spring Boot 自动配置来工作。

要启用 Bus,需要添加 spring-cloud-starter-bus-amqpspring-cloud-starter-bus-kafka 依赖。确保消息代理(RabbitMQ 或 Kafka)可用可配置,Spring Cloud 会负责余下部分。

Bus 支持向所有的监听节点或指定服务(由 Euresk 定义)的所有节点发送消息。

/bus/* actuator 有名称空间有一个 HTTP 端点,目前有两项是已实现的:

  1. /bus/env:发送 key/value(键值对)来更新每个节点的 Spring 环境。
  2. /bus/refresh:重新加载每个应用程序的配置,就好像对应用的 /refresh 端点发了请求。

Bus 端点

Spring Cloud Bus 提供两个端点,/actuator/bus-refresh/actuator/bus-env,分别对应 Spring Cloud Commons 中的 /actuator/refresh/actuator/env 的 actuator 端点。

Bus Refresh 端点

/actuator/bus-refresh 端点会清空 RefreshScope 缓存,重新绑定 @ConfigurationProperties更多请参见 Refresh Scope

要暴露 /actuator/bus-refresh 端点,需在配置文件中添加如下设置:

application.properties

management.endpoints.web.exposure.include=bus-refresh

Bus Env 端点

/actuator/bus-env 端点会在多个实例中使用指定的 key/value(键/值)对更新每个实例的环境。

要暴露 /actuator/bus-env 端点,需在配置文件中添加如下设置:

management.endpoints.web.exposure.include=bus-env

/actuator/bus-env 端点接收如下形式的 POST 请求:

{
    "name": "key1",
    "value": "value1"
}

实例寻址

寻址定义

每个应用实例都有一个 service Id,该值可通过 spring.cloud.bus.id 设置,值是以冒号分隔的字符列表,从大类到具体。

默认值是从 environment 中构建,是 spring.application.nameserver.port (或者 spring.application.index 如果有设置)的组合。默认值的构建格式是 app:index:id,如下:

  • appvcap.application.name 如果有设置,或者 spring.application.name
  • indexvcap.application.instance_index 如果有设置,或者是 spring.application.index,或者 local.server.port, 或是 server.port, 或是 0
  • idvcap.application.instance_id 如果有设置,或者是随机者。

HTTP 端点接受目标路径参数 /bus-refresh/{destination},例如 /bus-refresh/customer:9000 ,customer:9000 是 service Id。如果该 ID 由总线上的实例拥有,则此实例处理消息,其他实例忽略此消息。

寻址匹配

HTTP 端点接受的目标路径参数支持模式匹配,可以使用 Spring PathMatcher(路径分隔符为冒号 :),用于确定某些实例是否处理该消息。

前面示例:/bus-env/customers:** 对于 customers 的所有实例有效,而不考虑 service Id 的其余部分。

服务ID 唯一

Bus 会尝试两次清除对事件(Event)的处理,一次从原始的 ApplicationEvent,一次从队列。为此,它将概据当前服务ID 检查发送服务ID,如果多个实例具有相同的 ID,则不会处理事件,如果多个实例是在同一台机器上运行,则每个实例都在不同的端口上,端口是 ID 的一部分。

Cloud Foundry 提供了一个索引来区分,若没有使用 Cloud Foundry 而要确保唯一,需为每个服务的实例设置唯一的 spring.application.index

消息代理

Spring Cloud Bus 使用 Spring Cloud Stream 来广播消息。 因此,要使消息流动,只需要在类路径中包含所选择的绑定器实现。AMQP(RabbitMQ)和 Kafka(spring-cloud-starter-bus- [amqp | kafka])为 Bus 提供了非常方便的 starts 组件。

一般来说,Spring Cloud Stream 依赖于Spring Boot 自动配置约定来配置中间件。例如,可以使用 spring.rabbitmq.* 属性来配置 AMQP 代理地址。

Spring Cloud Bus 在 spring.cloud.bus.* 中有一些原生配置属性(例如,spring.cloud.bus.destination 是用作外部中间件的主题的名称)。 通常,默认值就足够了。

要了解有关如何自定义消息代理设置的更多信息,请参阅 Spring Cloud Stream 文档。

Bus 事件

跟踪 Bus 事件

Bus events(RemoteApplicationEvent 的子类) 通过设置 spring.cloud.bus.trace.enabled=true 来进行跟踪。如果这样做了,Spring Boot TraceRepository(如果存在) 会显示发送的每个事件以及来自每个服务实例的所有 ACK。

以下示例来自 /trace 端点:

{
  "timestamp": "2015-11-26T10:24:44.411+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "stores:8081",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.864+0000",
  "info": {
    "signal": "spring.cloud.bus.sent",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.862+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
}

上面示例显示了一个从 customer:9000 发送的 RefreshRemoteApplicationEvent 事件,广播给所有服务,并由 customer:9000stores:8081 接收(acked:确认)。

若要自己处理 ACK 信号,可以为应用程序添加 AckRemoteApplicationEventSentApplicationEvent 类型的 @EventListener(并启用跟踪)。或者,利用 TraceRepository 从数据中挖掘信息。

广播自定义事件

Bus 可以承载 RemoteApplicationEvent 类型的任何事件。 默认传输是 JSON,反序列化时需要提前知道将使用哪些类型。 要注册新类型,必须将其放在 org.springframework.cloud.bus.event 的子包中。

要自定义事件名称,可以在自定义类上使用 @JsonTypeName,或依赖默认策略,即使用类的简单名称。

自定义包中注册事件

如果不想或不能使用 org.springframework.cloud.bus.event 的子包下自定义事件,就必须使用 @RemoteApplicationEventScan 注解并指定扫描 RemoteApplicationEvent 类型的事件所在的包。@RemoteApplicationEventScan 指定的包包含子包。

如下示例,自定义了 MyEvent 事件:

package com.acme;

public class MyEvent extends RemoteApplicationEvent {
    ...
}

可以通过以下方式向反序列化器注册该事件:

package com.acme;

@Configuration
@RemoteApplicationEventScan
public class BusConfiguration {
    ...
}

@RemoteApplicationEventScan 注解在不指定值的情况下,将使用其所在类的包,示例中,使用 BusConfiguration 包注册 com.acme。

还可以确切地为 @RemoteApplicationEventScan 注解使用 valuebasePackagesbasePackageClasses 属性来指定包。如下示例:

package com.acme;

@Configuration
//@RemoteApplicationEventScan({"com.acme", "foo.bar"})
//@RemoteApplicationEventScan(basePackages = {"com.acme", "foo.bar", "fizz.buzz"})
@RemoteApplicationEventScan(basePackageClasses = BusConfiguration.class)
public class BusConfiguration {
    ...
}

整合 Spring Cloud Bus

Spring Cloud Bus 是一个中间组件,被整合到其它项目中使用。这里以 Spring Cloud Config Client 项目整合 Bus 为例,实现集群实例自动更新配置。

Config Server 和 Client 项目使用之前的项目(Spring Cloud系列(十):分布式配置管理 Config 之 Git 实现)。

RabbitMq

使用 RabbitMq 作为 Spring Cloud Bus 的消息代理,需要 RabbitMq 服务,如果没有则需要安装。

涉及 RabbitMq 安装配置相关操作可参考文章 Spring Cloud系列(十三):分布式服务链路跟踪 Sleuth 中关于 RabbitMq 描述,或可参考文章 Spring Boot 2实践系列(三十五):集成 RabbitMQ 消息中间件 中关于 RabbitMq 的安装和与集成。

添加依赖

Config Client 项目(例子中的项目是 orderService) 添加 spring-cloud-starter-bus-amqp 依赖,该依赖包含了 spring-cloud-starter-stream-rabbitspring-cloud-streamspring-boot-starter-amqp 等依赖。

注意spring-boot-starter-actuator 依赖不可缺,HTTP 端点的暴露依赖此组件,在这里提供刷新端点。

pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

连接 RabbitMq

Config Client 项目(例子中的项目是 orderService) 的引导配置文件添加连接 RabbitMq Server 的配置。

bootstrap.properties

# Rabbitmq
spring.rabbitmq.host=10.0.3.6
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
# 暴露端点
management.endpoints.web.exposure.include=bus-refresh,bus-env

启动服务

  1. 启动 Config Server 项目服务。

  2. 远程配置文件设置自定义属性

    common.properties.appId=ABCDEFG123456
    common.properties.secret=qwertyuiop
    common.properties.key=112233
    
  3. 启动 Config Client 项目启动两个实例,端口分别是 8010 和 8011,项目中有个 /config/property 接口来获取远程配置文件中的属性值。

    项目中有个 CommonProperties 属性实体类,使用了 @ConfigurationProperties 注解来注入配置文件中属性的值,使用了 @RefreshScope 注解来自动刷新属性值。
    CommonProperties.java

    @Configuration
    @ConfigurationProperties(prefix = "common.properties")
    @RefreshScope
    public class CommonProperties {
    
       private String appId;
       private String secret;
       private String key;
    
       //--省略 set/get/toString 方法
    }
    

    PropertyController.java

    @RestController
    @RequestMapping("/config")
    public class PropertyController {
    
       @Autowired
       private CommonProperties commonProperties;
       @Autowired
       private Environment environment;
    
       @GetMapping("/property")
       public String refreshProperties(){
           String envAppId = environment.getProperty("common.properties.app-id");
           return commonProperties + "-----" + envAppId;
       }
    }
    
  4. 打开 RabbitMq Web 控制台,查看连接和队列

    启动了两个 Config Client 实例,在 Connection 项里可以看到多了两个连接,在 Queues 项里看到多个两个以 springCloudBus 开头的队列名称。

    springcloud-15-bus-rabbitmq-queue

更新配置

  1. 分别向端口 8010 和 8011 两个实例的接口(/config/property) 发送请求,返回远程配置文件中属性的值。

  2. 修改远程配置文件中属性的值,例如修改:common.properties.appId=HELLO CHINA

  3. 将其中一个实例(8010) 的 Bus 刷新端点发送请求:http://localhost:8010/actuator/bus-refresh

    查看 8010 和 8011 两个实例的控制台打印输出,可以看到实例重新拉取并加载了远程配置文件。

  4. 执行第一步,再次向两个实例的接口发送请求,可以看到都返回的是更新后的值。

    即请求的是 8010 实例的刷新端点,8011 实例会同步刷新配置。

架构优化

前面的所有操作都是在 Config Client 端执行,请求刷新端口也是发送给每一个 Config Client 实例,在实例生产环境中,这个 Config Client 实例就存在不确定性,就不同于集群中的其它实例,这样是不利于运维。

做如下优化改动,让集群中的各个实例是对等的。

  1. 在 Config Server 端引入 Spring Cloud Bus,将配置服务器加入到消息总线中。
  2. 刷新端点(/actuator/bus-refresh) 请求不再发送给某个具体实例,而是发送给 Config Server,并通过路径参数( /bus-refresh/{destination} )指定需要更新的服务或实例。

通过如上改动后,应用实例不再负责触发配置更新的职责,只需针对 Config Server 即可触发服务的所有实例或指定的实例更新配置。

更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: