TaskManager启动Task线程后,会调用StreamTask#invoke()来触发当前Task内StreamOperator的执行。其中会在beforeInvoke()来初始化StreamOperator的State。
/*** 对当前StreamTask内所有的StreamOperator,(提供一个上下文来)初始化(算子 or 键控)State*/
private void initializeStateAndOpen() throws Exception {// 取出OperatorChain中所有的StreamOperatorStreamOperator>[] allOperators = operatorChain.getAllOperators();// 遍历这些StreamOperator,对每个StreamOperator,进行状态初始化for (StreamOperator> operator : allOperators) {if (null != operator) {// 对每个StreamOperator进行“状态初始化”:提供一个包含各种xxxStateStore的上下文StateInitializationContext// 本质:将(拥有xxxStateStore的)StateInitializationContext提供给自定义Function使用(基于xxxStateStore初始化xxxState)operator.initializeState();// 调用StreamOperator的open()方法,执行open()方法内的初始化逻辑// 例如:(实现了RichFunction的自定义函数)可以在open()方法中去getXXXStateoperator.open();}}
}
在对StreamOperator内的状态进行初始化后,就会执行StreamOperator内自定义Function中的open()方法。
/*** 对StreamOperator进行“状态初始化”:提供一个上下文,确定使用哪个StateBackend*/
@Override
public final void initializeState() throws Exception {// 从ExecutionConfig中获取Key的序列化器final TypeSerializer> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());// 当前StreamOperator所属的StreamTaskfinal StreamTask, ?> containingTask =Preconditions.checkNotNull(getContainingTask());final CloseableRegistry streamTaskCloseableRegistry =Preconditions.checkNotNull(containingTask.getCancelables());/*** StreamTaskStateInitializer是专门用来创建StreamOperatorStateContext的,* StreamOperator必须得通过StreamOperatorStateContext,来初始化StateBackend、InternalTimeServiceManager等。*/final StreamTaskStateInitializer streamTaskStateManager =Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());/*** StreamOperatorStateContext存储了键控状态、算子状态的状态后端,以及管理时间服务的InternalTimeServiceManager*/final StreamOperatorStateContext context =// 创建AbstractKeyedStateBackend、OperatorStateBackend以及管理时间服务的InternalTimeServiceManager,并保存起来streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics);// 获取算子状态的状态后端this.operatorStateBackend = context.operatorStateBackend();// 获取键控状态的状态后端this.keyedStateBackend = context.keyedStateBackend();if (keyedStateBackend != null) {// 将KeyedStateBackend进一步包装成代理类:KeyedStateStorethis.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());}// 获取InternalTimeServiceManager(被用来管理内部专用的InternalTimerService),以便能在当前StreamOperator中注册/管理TimertimeServiceManager = context.internalTimerServiceManager();// 提供了创建、获取KeyedState的方法CloseableIterable keyedStateInputs = context.rawKeyedStateInputs();// 提供了创建、获取OperatorState的方法CloseableIterable operatorStateInputs = context.rawOperatorStateInputs();try {/*** 将各种xxxStateBackend进一步包装成xxxStateStore(状态后端的代理类),保存到StateInitializationContext。因此它就具备了管理算子状态、键控状态、原生状态的能力!* StateInitializationContext会被用在StreamOperator、自定义Function中,(通过内部保存的xxxStateStore)来初始化State(getState、getMapState...)*/StateInitializationContext initializationContext = new StateInitializationContextImpl(context.isRestored(), // information whether we restore or start for the first timeoperatorStateBackend, // access to operator state backendkeyedStateStore, // access to keyed state backendkeyedStateInputs, // access to keyed state streamoperatorStateInputs); // access to operator state stream// AbstractUdfStreamOperator使用自定义CheckpointedFunction时,会通过这个接口方法来初始化状态// 本质:利用(拥有xxxStateStore的)StateInitializationContext,来初始化StreamOperator中的xxxStateinitializeState(initializationContext);} finally {closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);}
}
在以上方法中,核心操作就是“包装上下文”。
核心 1:StreamTaskStateInitializer
StreamTaskStateInitializer是专门用来创建StreamOperatorStateContext的,只有把StreamOperatorStateContext暴露给StreamOperator,才能进一步的创建xxxStateBackend。可以说,StreamTaskStateInitializer就是连接StreamOperator和StreamOperatorStateContext的桥梁。
核心 2:StreamOperatorStateContext
在StreamTaskStateInitializer创建StreamOperatorStateContext的过程中,会创建好键控状态、算子状态的StateBackend,以及管理时间服务的InternalTimeServiceManager。然后将它们包装到StreamOperatorStateContext里面保存起来。
这样一来,AbstractStreamOperator就能对各种StateBackend和InternalTimeServiceManager,“随用随取”
/*** 基于StateBackend的实现子类提供的“创建xxxStateBackend”的具体实现逻辑,得到xxxStateBackend。* 然后初始化InternalTimeServiceManager。*/
@Override
public StreamOperatorStateContext streamOperatorStateContext(@Nonnull OperatorID operatorID,@Nonnull String operatorClassName,@Nonnull ProcessingTimeService processingTimeService,@Nonnull KeyContext keyContext,@Nullable TypeSerializer> keySerializer,@Nonnull CloseableRegistry streamTaskCloseableRegistry,@Nonnull MetricGroup metricGroup) throws Exception {// 获取TaskInfoTaskInfo taskInfo = environment.getTaskInfo();// StreamOperator的SubTask描述(包含OperatorID、OperatorClassName),最终形态是StringOperatorSubtaskDescriptionText operatorSubtaskDescription =new OperatorSubtaskDescriptionText(operatorID,operatorClassName,taskInfo.getIndexOfThisSubtask(),taskInfo.getNumberOfParallelSubtasks());final String operatorIdentifierText = operatorSubtaskDescription.toString();final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =taskStateManager.prioritizedOperatorState(operatorID);AbstractKeyedStateBackend> keyedStatedBackend = null;OperatorStateBackend operatorStateBackend = null;CloseableIterable rawKeyedStateInputs = null;CloseableIterable rawOperatorStateInputs = null;InternalTimeServiceManager> timeServiceManager;try {// -------------- Keyed State Backend --------------// 创建键控状态的状态后端keyedStatedBackend = keyedStatedBackend(keySerializer,operatorIdentifierText,prioritizedOperatorSubtaskStates,streamTaskCloseableRegistry,metricGroup);// -------------- Operator State Backend --------------// 创建算子状态的状态后端operatorStateBackend = operatorStateBackend(operatorIdentifierText,prioritizedOperatorSubtaskStates,streamTaskCloseableRegistry);// -------------- Raw State Streams --------------rawKeyedStateInputs = rawKeyedStateInputs(prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);rawOperatorStateInputs = rawOperatorStateInputs(prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);// -------------- Internal Timer Service Manager --------------// InternalTimeServiceManager实例,用来管理内部的时间服务timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, processingTimeService, rawKeyedStateInputs);// -------------- Preparing return value --------------// 将算子/键控状态的状态后端、管理时间服务的TimeServiceManager,封装到StreamOperatorStateContextImpl中return new StreamOperatorStateContextImpl(prioritizedOperatorSubtaskStates.isRestored(),operatorStateBackend,keyedStatedBackend,timeServiceManager,rawOperatorStateInputs,rawKeyedStateInputs);} catch (Exception ex) {// 省略部分代码...}
}
核心 3:StateInitializationContext
创建好的各种StateBackend还不会直接给CheckpointedFunction使用,而是要包装成代理类StateStore。为了能够保存这些StateStore,特地准备了StateInitializationContext,由StateInitializationContext来持有StateStore。
然后将StateInitializationContext传递给AbstractUdfStreamOperator中的CheckpointedFunction,如果有自定义需求,就可以在CheckpointedFunction接口定义的接口方法initializeState()的具体实现中,利用StateInitializationContext取出StateStore,并完成“状态初始化”操作!
/*** 把“保存有StateStore”的StateInitializationContext,交给CheckpointedFunction使用,* 使之能够在接口方法initializeState()中,使用这个上下文来初始化状态*/
@Override
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);// 对自定义Function中的xxxState进行初始化和恢复,实际就是将StateInitializationContext提供给自定义Function使用(因为要用到xxxStateStore)StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}
以下为百度扒来的自定义CheckpointedFunction的实现(不看逻辑,只关心状态初始化即可):
public static class 自定义Function implements MapFunction, CheckpointedFunction {private ListState countPerPartition;private long localCount;/*** 计算交易总比数*/@Overridepublic Long map(Transaction transaction) throws Exception {localCount++;return localCount;}/*** 快照操作的特殊逻辑*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {countPerPartition.clear();countPerPartition.add(localCount);}/*** 初始化状态*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// 从StateInitializationContext中取出StateStore,并初始化出ListStatecountPerPartition = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("perPartitionCount", Long.class));// initialize the "local count variable" based on the operator statefor (Long l : countPerPartition.get()) {localCount += l;}}
}
又比如我们在键控流上使用getRuntimeContext().getState(Desc)来初始化ValueState,也是由RuntimeContext接口的抽象实现AbstractRuntimeUDFContext提供,由StreamingRuntimeContext为其提供具体的实现逻辑:
/*** 通过RuntimeContext获取ValueState*/
@Override
public ValueState getState(ValueStateDescriptor stateProperties) {// 校验完StateDescription后,得到包装好的KeyedStateStoreKeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);// 初始化StateDescription的序列化器stateProperties.initializeSerializerUnlessSet(getExecutionConfig());// 通过keyedStateStore接口(默认实现子类为DefaultKeyedStateStore),创建和获取ValueStatereturn keyedStateStore.getState(stateProperties);
}/*** 校验StateDescription,并返回包装好的KeyedStateStore(可以用来初始化KeyedState)*/
private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor, ?> stateDescriptor) {// 对StateDescriptor进行安全校验Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");// 获取StreamOperator中的KeyedStateStore(AbstractStreamOperator已经把创建好的KeyedStateStore保存起来了)KeyedStateStore keyedStateStore = operator.getKeyedStateStore();Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");// KeyedStateStore是KeyedStateBackend的代理,会提供给RuntimeContext来创建xxxStatereturn keyedStateStore;
}
可以看出,AbstractStreamOperator会提供出StateBackend的代理类StateStore。别忘了,Flink系统内部进行状态初始化的过程中,已经创建好了StateStore,且由AbstractStreamOperator的全局变量持有。这里正是将StateStore取出,并利用它来初始化得到ValueState。至此,一切就都解释得通了。
总结一下,Flink系统内部初始化状态,就是将状态后端保存到一个上下文中,并将这个上下文传递给StreamOperator的自定义function。开发者可以在自定义Function中,使用上下文来灵活的获取各种State。