39

flink exactly-once 系列之事务性输出实现

 4 years ago
source link: https://www.tuicool.com/articles/emMnQvf
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

flink exactly-once系列目录:

一、两阶段提交概述

二、两阶段提交实现分析

三、StreamingFileSink分析

四、事务性输出实现

五、最终一致性实现

前几篇分析到Flink 是可以通过状态与checkpoint机制实现内部Exactly-Once 的语义,对于端到端的Exactly-Once语义,Flink 通过两阶段提交方式提供了对Kafka/HDFS输出支持,两阶段提交实现是结合checkpoint流程提供的hook来实现的,实现CheckpointedFunction与CheckpointListener接口:

1. initializeState 方法里面做事务状态的恢复与重新提交

2. snapshotState 方法里面开启事务与将需要输出的数据写到状态中容错

3. notifyCheckpointComplete方法提交事务

使用flink自带的实现要求继承TwoPhaseCommitSinkFunction类,并且实现beginTransaction、preCommit、commit、abort这几个方法,虽然说使用起来很方便,但是其有一个限制那就是所提供的事务hook(比喻Connection)能够被序列化,并且反序列化之后能够继续提交之前的事务,这个对于很多事务性的数据库是无法做到的,所以需要实现一套特有的事务提交。

之前分析到两阶段提交的主要问题是在第二阶段,commit有可能会存在部分成功与部分失败,所以才有了事务容错恢复,提交失败的重启继续提交,提交成功的重启再次提交是幂等的不会影响数据的结果,现在没有了这样一个可序列化的事务hook,另外需要提交的数据也做了状态容错。但是Flink 在checkpoint机制中提供了一个唯一的标识checkpointId, 它是用户可访问的、单调递增的、容错的,任务失败之后会从最近一次成功点继续递增,那么就可以使用checkpointId 来作为事务提交的句柄,首先看一下逻辑流程:

iu2y2qn.png!web

1. invoke 方法:将需要提交的数据添加到内存List中

2. snapshotState方法:将checkpointId与list存放在状态中

3. notifyCheckpointComplete方法:将list与checkpointId做事务性提交,并且使用checkpointId做CAS机制

4. initializeState方法:从状态中恢复checkpointId与list数据,同样做事务性提交

代码实现:

public abstract class CommonTwoPhaseCommit<IN extends Serializable> extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
 
 
private long checkpointId;
private List<IN> dataList;
 
private ListState<IN> dataListState;
private ListState<Long> checkpointIdState;
 
 
@Override public void initializeState(FunctionInitializationContext context) throws Exception {
 
dataList=new ArrayList<>();
dataListState=context.getOperatorStateStore().getSerializableListState("listdata");
checkpointIdState=context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("checkpointI",Long.class));
if(context.isRestored())
{
dataListState.get().forEach(x->{
dataList.add(x);
});
Iterator<Long> ckIdIter=checkpointIdState.get().iterator();
checkpointId=ckIdIter.next();
commit(dataList,checkpointId);
}
}
 
 
@Override public void invoke(IN value, Context context) throws Exception {
dataList.add(value);
}
 
 
@Override public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
dataListState.clear();
dataListState.addAll(dataList);
dataList.clear();
 
checkpointIdState.clear();
checkpointId=context.getCheckpointId();
checkpointIdState.addAll(Collections.singletonList(checkpointId));
}
 
@Override public void notifyCheckpointComplete(long checkpointId) throws Exception {
commit(dataListState.get(),checkpointId);
}
 
/**
* 使用checkpoint与数据库已经存在值进行比较,要求正好比其大1
* @param data
* @param checkpointId
*/
public abstract void commit(Iterable<IN> data,long checkpointId);
 
}

那么只需要继承CommonTwoPhaseCommit 类,实现其commit方法,做事务提交即可。目前该方案用于对window窗口聚合的延时补偿处理中,输出端为MySql,后期将会研究对Redis等其他数据库如何做一致性处理。

RRV3Yj6.jpg!web

关注回复【Flink】获取更多信息~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK