原文地址: https://my.oschina.net/u/992559/blog/1819948
作者: moyiguke
前言
在消息处理过程中,除了 Flink 程序本身的逻辑(operator),我们还需要和外部系统进行交互,例如本地磁盘文件,HDFS,Kafka,MySQL 等。虽然 Flink 本身支持 Exactly-Once 语义,但是对于完整的数据处理系统来说,最终呈现出来的语义和外部系统是相关的。
我们先总览一下 Flink 不同 connector 的消息传递语义 。
在 Guarantees 这一列,我们可以发现以下 3 种语义:
- at most once : 至多一次。可能导致消息丢失。
- at least once : 至少一次。可能导致消息重复。
- exactly once : 刚好一次。不丢失也不重复。
语义分析
我们结合 Kafka connector 来介绍这 3 中不同的语义,以及分析它是如何产生的。
Kafka Producer 的语义
Producer 的 at-most-once 和 at-least-once 语义主要由“retries”控制(在 callback 中实现异常重发也相当于 retry)。
如果配置值是一个大于 0 的整数,Producer 在收到 error 的 callback 后,Producer 将重新发送消息。考虑这一种情况,收到消息后,Broker 正确的保存了消息,只是在返回 ack 时出现 broker 故障或者网络异常。这时候,producer 收到 error 的 callback,它不能确认异常原因,只能重新发送消息,这样就导致了消息重复。
如果配置值等于 0,Producer 在收到 error 的 callback 后,不重新发送消息。如果异常时由于 broker 没有正确保存消息导致,那么将导致消息丢失。
Producer 的 Exactly-Once 语义,主要由“enable.idempotence”控制,如果该参数为 true,将会确保消息最终只会被 broker 保存一次。同样的 Producer 在接收到 error 的 callback 后,它需要重发数据,只是在 0.11 以及更新的版本中,Producer 会为每一批消息生成一个序列号,通过这个序列号 Broker 可以过滤重复消息。并且由于序列号是保存在 topic 上的,即使主分片失败了,新的 broker 也能知道消息是否需要过滤。这里还有一个细节需要注意,“acks”不能被设置为 0 或者 1,因为万一主分片(leader replication)异常下线,将导致数据丢失,这样语义被破坏了。
NOTE : Kafka 有两个概念很容易被混淆。一个是 Durable,另一个是 Message Delivery Semantics。这两个地方都存在消息丢失的可能性,但是机制完全不同。
Durable 主要描述软件或者服务器故障后,数据是否仍能保留。Durable 丢失消息主要是没有持久化:主分片收到数据后没有及时刷新到磁盘,副本没有及时复制以及持久化到磁盘。
Durable 主要通过“acks”控制,最强的级别是“all”,在 broker 返回 ack 之前,它会确认每一个副本都已经保存了该消息。这样它能在 n-1 个副本宕机后,仍保留完整数据。最弱的级别是“0”,broker 收到消息不确认持久化就返回,如果后续持久化失败,消息会丢失。当“acks”设置为“1”的时候,broker 会确认主分片(leader replication)已经保存了消息,同时副本会主动向主分片同步,消息丢失风险较小。但是存在这种情况,消息到达主分片并且返回了 success 的 ack,这时主分片 fail 并且副本未来得及同步这条消息,消息会丢失。
Message Delivery Semantics 主要是描述在消息系统中,消息实际被处理的次数。 要区别这两点,可以简单的认为,Durable 关注消息的持久化,Message Delivery Semantics 关注消息的发送。
Kafka Consumer 的语义
Consumer 的 at-most-once 和 at-least-once 语义主要通过“offset”控制。offset 的可配置为自动提交和手动提交。若配置“enable.auto.commit”为 true,在 Consumer fetch 数据后,后台会自动提交 offset。若配置“enable.auto.commit”为 false,需要主动调用 commitSync()或者 commitAsync()来提交 offset。
在自动提交的情形下,Consumer 表现为 at-most-once 语义。在主动提交的情形下,根据用户对异常处理的不同,可表现为 at-most-once 或者 at-least-once。
假设 Consumer 在 fetch 完数据后,后续的处理步骤出现了异常。
如果 offset 是自动提交的,那么 Consumer 将不能再次消费这些数据(除非重启 Consumer,并通过 seek(TopicPartition, long)重置 offset)。它表现出 at-most-once 语义。
在捕获异常后,如果手动提交 offset,表现出 at-most-once 语义。如果不提交 offset,Consumer 可重复消费该消息,表现出 at-least-once 语义。
在 Consumer 中,没有配置可以保证 Exactly-Once 语义。若要达到这个目标,需要在 at-least-once 的基础上实现幂等。这点和 Producer 是类似的,区别是 Consumer 的幂等性需要用户自己来完成。
编码准备
前面的篇幅主要介绍了 Kafka 的 3 种语义(Message Delivery Semantics),通过上述内容,我们可以得出,想要 Flink 和 Kafka 达成端到端 Exactly-Once 语义,首先我们需要 0.11 版本或者更新的 Kafka 、Producer 和 Consumer,其次使用幂等的 Producer 发送数据以及实现幂等的 Consumer 消费。
前置条件
- JDK8
- Maven 3
- Git
- IDE
- Kafka
创建基本工程
数据以及部分代码来自 http://training.data-artisans.com/ 。
- 本文地址:Flink 实战: 结合 Kafka 构建端到端的 Exactly-Once 处理程序
- 本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出
Flink 提供了通过 mvn 生成的精简 Flink 工程的方式,使用起来非常方便。在 pom 文件中,也包含了 shade 打包的方式,因为提交到集群上运行,需要 jar-with-dependencies。
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.4.2
-DgroupId=org.apache.flink.quickstart
-DartifactId=flink-java-project
-Dversion=0.1
-Dpackage=org.apache.flink.quickstart
-DinteractiveMode=false
按实际需要修改 DgroupId,DartifactId,Dversion。
下载另一个依赖工程(示例代码需要),执行 install
Git clone https://github.com/dataArtisans/flink-training-exercises.git
cd flink-training-exercises
mvn clean install
在建立的工程中加入上一步打包的 jar 作为依赖
<dependency> <groupId>com.data-artisansgroupId> <artifactId>flink-training-exercisesartifactId> <version>0.15.2version> dependency>
下载样例数据
wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
wget http://training.data-artisans.com/trainingData/nycTaxiFares.gz
导入项目到 IDE,编写 FlinkKafkaProducerEOSDemo
public static void main(String[] args) throws Exception {
final int maxEventDelay = 60; // events are out of order by max 60 seconds
final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String rideInput = "./taxi-data/nycTaxiRides.gz";
String taxiRideTopicId = "taxi-ride";
// start the data generator
DataStream<TaxiRide> rides = env.addSource(
new CheckpointedTaxiRideSource(rideInput, servingSpeedFactor));
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
SerializationSchema<TaxiRide> taxiRideSerializationSchema = new TaxiRideSchema();
rides.addSink(new FlinkKafkaProducer011<TaxiRide>(taxiRideTopicId,
new KeyedSerializationSchemaWrapper(taxiRideSerializationSchema),
properties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // 开启Kafka EOS
));
env.execute("send taxi ride to kafka ");
}
上述代码的主体逻辑是读取 nycTaxiRides.gz,并将数据发往 Kafka。 主要使用了 CheckpointedTaxiRideSource 以及 FlinkKafkaProducer011。 接下来,说明为什么它们能达成 End-To-End 的 Exactly-Once 语义。
CheckpointedTaxiRideSource:这是一个拥有状态的文件流,它在 Checkpoint 的时候记录数据读取位置(相当于 Kafka 的 offset),Flink 错误恢复后会重新定位到 checkpoint 记录的位置,它在整个系统上表现出来的是 at-least-once。考虑这样一个场景,checkpoint 成功,但是某一个 commit 失败,原则上本次所有的提交都要回滚。如果后续的 Sink 处理不当或者不支持回滚,这些数据会被提交到 Sink 中。在 Flink 恢复后,这部分数据被重新计算,导致 Sink 中出现了重复的数据。
FlinkKafkaProducer011 : 提供了幂等以及事务提交。Producer 的幂等性参照文章开头的语义说明,这里不再介绍。 Sink 中的幂等性主要是通过两阶段提交协议来支持的(注意区分 Kafka Producer 本身的幂等性和依靠事务实现的幂等性)。Kafka 0.11 及更新的版本提供了事务支持,可以结合 Flink 的两阶段提交协议使用。为了保证 Sink 中的数据的唯一性,将两次 checkpoint 之间的数据放在一个事务中,一起预提交,如果 commit 成功,则进入下一个 checkpoint;若失败,终止事务并回滚数据。
FlinkKafkaProducer011 两阶段提交代码
protected void preCommit(FlinkKafkaProducer011.KafkaTransactionState transaction) throws FlinkKafka011Exception {
switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) {
case 1:
case 2:
this.flush(transaction);
case 3:
this.checkErroneous();
return;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
protected void commit(FlinkKafkaProducer011.KafkaTransactionState transaction) {
switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) {
case 1:
transaction.producer.commitTransaction();
this.recycleTransactionalProducer(transaction.producer);
case 2:
case 3:
return;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
protected void abort(FlinkKafkaProducer011.KafkaTransactionState transaction) {
switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) {
case 1:
transaction.producer.abortTransaction();
this.recycleTransactionalProducer(transaction.producer);
case 2:
case 3:
return;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
上面是两阶段提交的主要代码。
preCommit:将本次 checkpoint 中未发往 Broker 的数据 flush 到 Kafka Broker。这时数据已经在 Kafka Broker 中,但是由于事务的隔离性,Consumer 暂时不会读取到这些数据(除非配置了“read_uncommitted”)。
TIPS :为什么需要调用 flush?
在 Flink processElement 的时候,调用 KafkaProducer 的 send 来发送数据,但是 Kafka 为了更高的性能,send 并不立即发送数据,而是缓存在 buffer 中,到一定的消息量才发往 Kafka Broker。这里通过 flush 可以强制将数据发往 Kafka Broker。
commit:提交事务,这时 Consumer 可以读到这些数据。
abort: 如果事务失败,终止事务。
FlinkKafkaConsumerEOSDemo
FlinkKafkaConsumerEOSDemo 的分析流程可以参照 FlinkKafkaProducerEOSDemo,这里不做细致分析。
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100);//
String taxiRideTopicId = "taxi-ride";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
DataStreamSource<TaxiRide> taxiRideDataStreamSource = env.addSource(new FlinkKafkaConsumer011<TaxiRide>(taxiRideTopicId, new TaxiRideSchema(), properties),
"kafka-source-ride");
String filePath = "./taxi-data/taxi-ride.txt";
WriteFormat format = new WriteFormatAsText();
long period = 200;
taxiRideDataStreamSource.filter(new RideCleansing.NYCFilter()).addSink(
new WriteSinkFunctionByMillis<TaxiRide>(filePath,format,period)
);
env.execute("print taxride ");
}
需要注意的是 FlinkKafkaConsumer011 的 Exactly-Once 语义通过用户配置自动设置,如果不确定 Flink 的语义,可以在 FlinkKafkaConsumer09 中打断点,断点位置:
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
自动配置相关代码:
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) { if (enableCheckpointing) { // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else { // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
总结
Flink 通过 checkpoint 和两阶段提交协议,为端到端的 Exactly-Once 的实现提供了可能,如果在项目中确实需要这种语义,不妨一试。
注意:本文归作者所有,未经作者允许,不得转载