SpringBoot2实践系列(四十四):集成Kafka消息中间件

star2017 1年前 ⋅ 527 阅读

  spring-kafka 为支持 Apache Kafka 提供了自动配置。Spring Boot 集成 Kafka 的配置由 spring.kafka.* 属性控制。

Kafka 服务

Kafka 服务安装与运行参考 [Kafka系列(一):Kafka 介绍和安装运行、发布订阅]http://www.gxitsky.com/article/1604456375060817)。

集成 Kafka

添加依赖

pom.xml 导入 spring-kafka 包

<!--Kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

添加配置

application.properties 配置文件中添加连接 kafka 服务器的配置。

spring.kafka.bootstrap-servers=10.0.3.4:9092
# 必须,消费者监听需要指定 group-id
spring.kafka.consumer.group-id=myGroup

创建主题

创建一个 NewTopic 类型的 Bean,如果 topic 已存在,则会忽略。

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic newTopic(){
        return new NewTopic("NBA",1, (short) 1);
    }
}

发送消息

Spring Boot 为 Kafka 提供了 KafkaTemplate 自动配置,可以直接注入使用。

@RestController
@RequestMapping("/kafka")
public class SendController {

    private static final Logger logger = LogManager.getLogger(SendController.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @RequestMapping("/topic/{msg}")
    public void sendMsg(@PathVariable String msg) {
        ListenableFuture result = kafkaTemplate.send("NBA", msg);
        result.addCallback(o -> System.out.println("send msg success"),
                throwable -> System.out.println("send msg fail"));
    }
}

如果定义了 spring.kafka.producer.transaction-id-prefix 属性,则会自动配置 KafkaTransactionManager 。如果自定义了 RecordMessageConverter ,则会自动关联到自动配置的 KafkaTemplate

接收消息

如果是个完整的 spring-kafka,则任何 Bean 上可以使用 @KafkaListener 注解创建一个监听端点。

@Component
public class MyConsumer {

    @KafkaListener(topics = "NBA", groupId = "${spring.kafka.consumer.group-id}")
    public void processMessage(String content) {
        System.out.println(content);
    }
}

如果定义了 KafkaTransactionManager Bean,则会自动关联到容器工厂(ContainerFactory)。同样,如果自定义了 RecordMessageConverterErrorHandlerAfterRollbackProcessor Bean,也会自动关联到默认工厂。

自定义 ChainedKafkaTransactionManager 必须添加 @primary 注解,因为它通常引用自动配置的 kafktransactionmanager Bean。

Kafka Streams

Spring 为 Kafka 提供了工厂 Bean 来创建 StreamsBuilder 对象来管理其生命周期。

只要在类路径下存在 kafka-streams 依赖,并且使用 @EnableKafkaStreams 注解启用了 Kafka Streams,Spring Boot 就会自动配置必要的 KafkaStreamsConfiguration Bean。

<!--kafka streams-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>

启用了 Kafka Streams,意味着必须设置 application idbootstrap server ,前者可通过 spring.kafka.streams.application-id 设置,如果未设置则默认使用 spring.application.name;后者可以全局设置或忖为流重写。

要使用 Factory Bean,只需在自定义的 KStream 类型的 Bean,使用 StreamsBuilder 构建,如下示例:

@Configuration
@EnableKafkaStreams
static class KafkaStreamsExampleConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
                Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }
}

默认情况下,StreamBuilder 对象在应用启动中就被创建,就会自动接管 streams。也可使用 spring.kafka.streams.auto-startup 属性来自定义此行为。

其它属性

请参考官方文档 Additional Kafka Properties

更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: