一文弄懂 Flink Task 数据交互之数据读源码
- 1. OneInputStreamTask
- 2. CheckpointBarrierHandler
- 3. SingleInputGate
- 4. InputChannel
- 4.1RemoteInputChannel
- 4.2 LocalInputChannel
- 5. SingleInputGate从InputChannel获取数据
- 5.1 首先看RemoteInputChannel
- 5.2 再看LocalInputChannel的实现
Flink 重要源码目录点击我
在Reduce端的数据读过程取过程,以及Reduce端的数据模型。本文暂时只分析Reduce端任务处理线程的数据读,不涉及到与上游任务的网络交换和数据请求,Task之间的数据交换主要是基于Credit的Netty网络通信,这部分将在之后
1. OneInputStreamTask
首先从下游的任务类型和执行流程开始分析,下游的任务类型主要是OneInputStreamTask,任务执行就是调用其run()方法,run()方法里就是循环的调用inputProcessor.processInput()进行数据处理。
//OneInputStreamTask类
protected void run() throws Exception {// cache processor reference on the stack, to make the code more JIT friendlyfinal StreamInputProcessor<IN> inputProcessor = this.inputProcessor;while (running && inputProcessor.processInput()) {// all the work happens in the "processInput" method}
}//vStreamTask 接口
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {// 这里调用 inputProcessor.processInputInputStatus status = inputProcessor.processInput();if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {return;}if (status == InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();jointFuture.thenRun(suspendedDefaultAction::resume);
}
在inputProcessor.processInput()中,数据获取的逻辑就是从barrierHandler中获取一个buffer,然后依次从buffer中获取一条数据进行处理,buffer数据被消费完之后再接着从barrierHandler中获取一个buffer,如此循环下去。
// StreamOneInputProcessor
@Override
public InputStatus processInput() throws Exception {// 这里调用 input.emitNext 方法InputStatus status = input.emitNext(output);if (status == InputStatus.END_OF_INPUT) {operatorChain.endHeadOperatorInput(1);}return status;
}// StreamTaskNetworkInput
@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {while (true) {// get the stream element from the deserializerif (currentRecordDeserializer != null) {// 先从之前已经获取的buffer里反序列化出一条数据记录DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer = null;}if (result.isFullRecord()) {// 在这里处理数据processElement(deserializationDelegate.getInstance(), output);return InputStatus.MORE_AVAILABLE;}}// 如果之前的buffer已经消费完了,就重新再从barrierHandler获取一个bufferOptional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();if (bufferOrEvent.isPresent()) {// return to the mailbox after receiving a checkpoint barrier to avoid processing of// data after the barrier before checkpoint is performed for unaligned checkpoint modeif (bufferOrEvent.get().isEvent() && bufferOrEvent.get().getEvent() instanceof CheckpointBarrier) {return InputStatus.MORE_AVAILABLE;}processBufferOrEvent(bufferOrEvent.get());} else {if (checkpointedInputGate.isFinished()) {checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");return InputStatus.END_OF_INPUT;}return InputStatus.NOTHING_AVAILABLE;}}
}private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {if (recordOrMark.isRecord()){// 进行实际的数据处理output.emitRecord(recordOrMark.asRecord());} else if (recordOrMark.isWatermark()) {// 处理 WaterMark 数据statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);} else {throw new UnsupportedOperationException("Unknown type of StreamElement");}
}
2. CheckpointBarrierHandler
这里我们需要进一步去看看barrierHandler中数据获取的逻辑,barrierHandler这个组件我们在分析checkpoint的时候分析过,barrierHandler意思就是barrier处理器,它处理的是checkpoint barrier。根据checkpoint模式的不同会创建不同的barrierHandler,如果是EXACTLY_ONCE,就会生成BarrierBuffer,会进行barrier对齐;如果是AT_LEAST_ONCE,那就会生成BarrierTracker,不会进行barrier对齐。如果在程序中没有设置checkpoint,那么默认的也是AT_LEAST_ONCE,barrierHandler是BarrierTracker实例。这里就不详细讲是怎么处理checkpoint的了,我们只看它是怎么获取数据buffer的
// BarrierBuffer
public BufferOrEvent getNextNonBlocked() throws Exception {while (true) {// process buffered BufferOrEvents before grabbing new onesOptional<BufferOrEvent> next;//currentBuffered里是进行checkpoint barrier对齐时缓存的那些被阻塞channel的buffer//currentBuffered不为空发生在完成一个checkpoint之后,将bufferBlocker里的buffer放到currentBufferedif (currentBuffered == null) {//一般情况下是从inputGate获取buffernext = inputGate.getNextBufferOrEvent();}else {next = Optional.ofNullable(currentBuffered.getNext());...}...BufferOrEvent bufferOrEvent = next.get();if (isBlocked(bufferOrEvent.getChannelIndex())) {// if the channel is blocked, we just store the BufferOrEvent//如果一个channel被阻塞了(已经接收到checkpoint barrier了),先添加到bufferBlocker里,但不会放到currentBufferedbufferBlocker.add(bufferOrEvent);checkSizeLimit();}else if (bufferOrEvent.isBuffer()) {return bufferOrEvent;}...//其他处理checkpoint事件}
}
// BarrierTracker
public BufferOrEvent getNextNonBlocked() throws Exception {while (true) {//比较简单,直接从inputGate中获取Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();if (!next.isPresent()) {// buffer or input exhaustedreturn null;}BufferOrEvent bufferOrEvent = next.get();if (bufferOrEvent.isBuffer()) {return bufferOrEvent;}...//其他的checkpoint事件}
}
// BarrierTracker
public BufferOrEvent getNextNonBlocked() throws Exception {while (true) {//比较简单,直接从inputGate中获取Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();if (!next.isPresent()) {// buffer or input exhaustedreturn null;}BufferOrEvent bufferOrEvent = next.get();if (bufferOrEvent.isBuffer()) {return bufferOrEvent;}...//其他的checkpoint事件}
}
从上面代码中可以看到,不管是BarrierBuffer还是BarrierTracker,都是从inputGate中来获取buffer的,这里就引出来Reduce端数据输入的一个最重要的模型InputGate。在分析inputGate.getNextBufferOrEvent()之前先来分析一下InputGate的数据结构。
3. SingleInputGate
InputGate的实现由两种,SingleInputGate和UnionInputGate,常见的就是SingleInputGate,UnionInputGate是将多个SingleInputGate进行联合在一起的InputGate,例如join算子,是从两个流中进行数据输入,它就是一个UnionInputGate。这里我们分析SingleInputGate就行了
public class SingleInputGate implements InputGate {/** The type of the partition the input gate is consuming. */private final ResultPartitionType consumedPartitionType;/*** The index of the consumed subpartition of each consumed partition. This index depends on the* {@link DistributionPattern} and the subtask indices of the producing and consuming task.*/private final int consumedSubpartitionIndex;/** The number of input channels (equivalent to the number of consumed partitions). */private final int numberOfInputChannels;/*** Input channels. There is a one input channel for each consumed intermediate result partition.* We store this in a map for runtime updates of single channels.*/private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;/** Channels, which notified this input gate about available data. */private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque<>();/*** Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers* from this pool.*/private BufferPool bufferPool;/** Global network buffer pool to request and recycle exclusive buffers (only for credit-based). */private NetworkBufferPool networkBufferPool;private final boolean isCreditBased;/** Flag indicating whether partitions have been requested. */private boolean requestedPartitionsFlag;/** Number of network buffers to use for each remote input channel. */private int networkBuffersPerChannel;//其他一些未列出的非核心成员...
主要成员:
consumedPartitionType: ResultPartitionType,数据之间交换的类型,有BLOCKING, PIPELINED, PIPELINED_BOUNDED。具体含义在《Task数据交互之数据写》中已经解释过,在实时流里都是PIPELINED_BOUNDED,意味有限制的流水线模式,上游生产的同时下游可以进行消费,采用有限的buffer去缓存这些数据。
consumedSubpartitionIndex: 在《Task数据交互之数据写》中说过,每个ResultPartition有多个ResultSubPartition,代表多个下游任务,每个ResultSubPartition都被下游的一个任务所消费。consumedSubpartitionIndex所代表的就是这个下游任务消费的是上游哪个ResultSubPartition
numberOfInputChannels: InputChannel的数量,InputChannel代表的是上游任务数据通道,每个InputChannel对应一个上游任务,例如上游有10个Map任务,那每个Reduce任务就会有10个InputChannel,如果map任务和reduce任务在同一个节点上,那InputChannel类型就是LocalInputChannel,否则就是RemoteInputChannel。如果InputGate对应的是ResultPartition,那InputChannel对应的就是ResultSubPartition
inputChannels: 这个InputGate所有的InputChannel
inputChannelsWithData: 接收到上游有数据发送过来的inputChannels,如果某个上游任务长时间没有数据产出,那它不会在这个队列里
bufferPool: LocalBufferPool,本地buffer池,InputChannel可以从这个buffer池中获取浮动buffer去缓存从上游任务发送过来的数据。
isCreditBased: 是否基于Credit的数据传输,默认是
networkBuffersPerChannel: 用于接收每个InputChannel数据的buffer数量,这部分当做InputChannel的独占buffer,默认是每个InputChannel使用两个独占buffer来缓存数据。
那这个SingleInputGate是在哪里创建的呢?
答案是JobMaster在部署Task的时候会创建TaskDeploymentDescriptor(根据ExecutionGraph),TaskDeploymentDescriptor中就包含了InputGate的描述信息InputGateDeploymentDescriptor,JobMaster将TaskDeploymentDescriptor发送给TaskManager之后,TaskManager会根据TaskDeploymentDescriptor构建Task,这时候会去创建InputGate
//Task构造方法
public Task(...Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,...) {...counter = 0;for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor: inputGateDeploymentDescriptors) {SingleInputGate gate = SingleInputGate.create(taskNameWithSubtaskAndId,jobId,executionId,inputGateDeploymentDescriptor,networkEnvironment,this,metricGroup.getIOMetricGroup());inputGates[counter] = gate;inputGatesById.put(gate.getConsumedResultId(), gate);++counter;}...
}
在SingleInputGate的结构中,最核心的当属inputChannels和bufferPool了。bufferPool的创建跟Map端类似,在《Task数据交互之数据写》中,我们分析了在map端LocalBufferPool中最大的buffer数 = task对应的下游任务数 * 每个下游任务需要的buffer + 额外多分配的buffer数。但在reduce端,LocalBufferPool中最大的buffer数 = 额外多分配的buffer数,默认情况下额外多分配的buffer数为8,这部分作为浮动的buffer。如果项目的任务数较大,应该调大这个参数。
4. InputChannel
接下来看看InputChannel
上面说过,每个InputChannel对应一个上游任务,如果map任务和reduce任务在同一个节点上,那InputChannel类型就是LocalInputChannel,否则就是RemoteInputChannel,正式环境中,大部分使用的还是RemoteInputChannel,因为每个下游任务节点要和多个上游节点去进行数据交互。所以我们先来看RemoteInputChannel
4.1RemoteInputChannel
InputChannel
接下来看看InputChannel
上面说过,每个InputChannel对应一个上游任务,如果map任务和reduce任务在同一个节点上,那InputChannel类型就是LocalInputChannel,否则就是RemoteInputChannel,正式环境中,大部分使用的还是RemoteInputChannel,因为每个下游任务节点要和多个上游节点去进行数据交互。所以我们先来看RemoteInputChannelRemoteInputChannel
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {/** The connection manager to use connect to the remote partition provider. */// 与其他节点进行通信连接的管理者private final ConnectionManager connectionManager;/*** The received buffers. Received buffers are enqueued by the network I/O thread and the queue* is consumed by the receiving task thread.*/// 从上游任务节点接收到的buffer数据队列,数据将会被任务处理线程所消费private final ArrayDeque<Buffer> receivedBuffers = new ArrayDeque<>();/** Client to establish a (possibly shared) TCP connection and request the partition.*/// 与上游节点通信的客户端,也可以说是netty客户端private volatile PartitionRequestClient partitionRequestClient;/** The initial number of exclusive buffers assigned to this channel. */// 初始的消费凭证,flink节点间数据传输默认是基于credit消费凭证的,关于具体的概念可以参考《Flink基于Credit的数据传输和背压》private int initialCredit;/** The available buffer queue wraps both exclusive and requested floating buffers. *//**可用的空闲buffer队列,下游任务接收到上游任务的数据时,从这个队列中拿一个空闲的buffer来缓存接收到的数据,并放到receivedBuffers列队里。bufferQueue包含两种buffer队列,一种是该RemoteInputChannel独占的buffer,数量等于networkBuffersPerChannel,独占的buffer是RemoteInputChannel私有的,在回收时会再次添加到RemoteInputChannel的可用buffer队列里;另一种是可以被多个RemoteInputChannel所共享的buffer,称之为浮动buffer,浮动buffer总量等于LocalBufferPool中的buffer数,当某个RemoteInputChannel没有足够的空闲buffer了(比如数据处理线程执行很慢的时候),可以从LocalBufferPool中申请浮动的buffer来缓存接收的数据,浮动的buffer在回收时放到LocalBufferPool里。接收端使用buffer接收数据时,优先使用浮动的buffer,再使用独占的buffer*/private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue();/** The number of required buffers that equals to sender's backlog plus initial credit. */@GuardedBy("bufferQueue")/**RemoteInputChannel接收数据需求的buffer数量,等于发送端数据积压的量+initialCredit,initialCredit的值等于每个RemoteInputChannel的独占buffer数。那么这个numRequiredBuffers的值其实是要大于发送端的数据积压量的,这样可以做到更加安全保险*/private int numRequiredBuffers;/** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */@GuardedBy("bufferQueue")/**是否需要等待浮动的buffer,当RemoteInputChannel申请浮动buffer的时候,发现LocalBufferPool也没有足够的buffer了,就会标识要等待空闲buffer,当LocalBufferPool有buffer回收了,就会分配给该RemoteInputChannel */private boolean isWaitingForFloatingBuffers;//其他一些未列出的非核心成员...
4.2 LocalInputChannel
上面分析了RemoteInputChannel,下面再来看看LocalInputChannel,它代表的是和下游任务在同一节点(JVM)的上游任务。
可以看到LocalInputChannel的结构相对简单,没有本地buffer队列什么的,因为在同一节点一个JVM中,直接读取上游任务产生的数据即可
public class LocalInputChannel extends InputChannel implements BufferAvailabilityListener {private final Object requestLock = new Object();/** The local partition manager. */// ResultSubpartition的一个视图,可以通过这个视图直接读取上游任务写到ResultSubPartition中的buffer,不需要再进行数据交换了private final ResultPartitionManager partitionManager;/** Task event dispatcher for backwards events. */private final TaskEventDispatcher taskEventDispatcher;/** The consumed subpartition. */private volatile ResultSubpartitionView subpartitionView;private volatile boolean isReleased;
5. SingleInputGate从InputChannel获取数据
回到上文,任务处理线程读数据最终是通过inputGate.getNextBufferOrEvent()来获取的,我却花了这么多篇幅来介绍SingleInputGate和InputChannel,是因为了解了这两个组件就能更好的理解数据接收端的架构了。那么下面就来具体看一下inputGate.getNextBufferOrEvent()的实现
//SingleInputGate类
public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {return getNextBufferOrEvent(true);
}private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {...//向服务端,也就是上游任务节点发起数据请求requestPartitions();InputChannel currentChannel;boolean moreAvailable;Optional<BufferAndAvailability> result = Optional.empty();do {synchronized (inputChannelsWithData) {while (inputChannelsWithData.size() == 0) {if (isReleased) {throw new IllegalStateException("Released");}if (blocking) {//如果没有任何InputChannel接收到数据,线程就会阻塞inputChannelsWithData.wait();}else {return Optional.empty();}}//从接收到数据的InputChannel队列inputChannelsWithData里出队一个InputChannelcurrentChannel = inputChannelsWithData.remove();enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());moreAvailable = !inputChannelsWithData.isEmpty();}//从InputChannel里获取一个bufferresult = currentChannel.getNextBuffer();} while (!result.isPresent());// this channel was now removed from the non-empty channels queue// we re-add it in case it has more data, because in that case no "non-empty" notification// will come for that channel//如果InputChannel还有多余的数据,则继续放到inputChannelsWithData队列里if (result.get().moreAvailable()) {queueChannel(currentChannel);moreAvailable = true;}final Buffer buffer = result.get().buffer();if (buffer.isBuffer()) {//将buffer封装成BufferOrEvent返回return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));}else {... //事件}
}
通过源码可以看到实现逻辑大致如下:
1、首先去请求上游的ResultPartition,这通常发生在首次获取数据的时候,向上游任务节点发送数据请求,建立tcp连接,之后这个连接会一直存在
2、从接收到数据的InputChannel队列inputChannelsWithData里出队一个InputChannel,再从这个InputChannel里获取一个buffer,如果没有任何InputChannel接收到数据,任务处理线程就会阻塞,直到有InputChannel接收到数据。
3、如果2中的InputChannel还有多余的数据,则继续放到inputChannelsWithData队列里,以便可以继续获取InputChannel后面的数据
接下来就看看InputChannel怎么获取buffer
首先看RemoteInputChannel
5.1 首先看RemoteInputChannel
//RemoteInputChannel类
Optional<BufferAndAvailability> getNextBuffer() throws IOException {final Buffer next;final boolean moreAvailable;synchronized (receivedBuffers) {next = receivedBuffers.poll();moreAvailable = !receivedBuffers.isEmpty();}numBytesIn.inc(next.getSizeUnsafe());numBuffersIn.inc();return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog()));
}
逻辑比较简单,就是从接收到的buffer队列里取一个buffer就行了。
5.2 再看LocalInputChannel的实现
//LocalInputChannel类
Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {checkError();ResultSubpartitionView subpartitionView = this.subpartitionView;if (subpartitionView == null) {...subpartitionView = checkAndWaitForSubpartitionView();}//通过subpartitionView获取,在JVM本地,而非远程BufferAndBacklog next = subpartitionView.getNextBuffer();...return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
}
//PipelinedSubpartitionView类
public BufferAndBacklog getNextBuffer() {return parent.pollBuffer();
}//PipelinedSubpartition类
BufferAndBacklog pollBuffer() {synchronized (buffers) {Buffer buffer = null;if (buffers.isEmpty()) {flushRequested = false;}while (!buffers.isEmpty()) {//从PipelinedSubpartition的buffers数据队列取队头的bufferBufferConsumer bufferConsumer = buffers.peek();buffer = bufferConsumer.build();checkState(bufferConsumer.isFinished() || buffers.size() == 1,"When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.");if (buffers.size() == 1) {// turn off flushRequested flag if we drained all of the available dataflushRequested = false;}//如果buffer是已经被写满的,不是写了一半数据的那种,就可以从buffers队列里删掉了if (bufferConsumer.isFinished()) {buffers.pop().close();decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());}if (buffer.readableBytes() > 0) {break;}buffer.recycleBuffer();buffer = null;if (!bufferConsumer.isFinished()) {break;}}if (buffer == null) {return null;}//更新PipelinedSubpartition的数据状态updateStatistics(buffer);// Do not report last remaining buffer on buffers as available to read (assuming it's unfinished).// It will be reported for reading either on flush or when the number of buffers in the queue// will be 2 or more.return new BufferAndBacklog(buffer,isAvailableUnsafe(),getBuffersInBacklog(),nextBufferIsEventUnsafe());}
}
总体逻辑也比较简单,就是从Map任务生产的ResultSubPartition里获取一个buffer,Map生产的数据都放到ResultSubPartition的buffers队列里了。
但是细节的一个问题是这个Reduce任务获取的这个buffer并不一定就从ResultSubPartition的buffers队列里删掉了,而是可能会获取这个buffer的部分数据。这是因为如果Map任务的数据生产很慢,经过200ms(默认buffer timeout)还没有填满一个buffer,它会进行刷新数据,让下游任务来访问或者推送到下游,这时就不会返回一整个buffer,而是返回这个buffer里200ms内所产生的数据,Map端会继续往这个buffer里去写数据,直至写满才会另写一个新的buffer。
再回到上文,任务线程从inputGate获取到buffer之后会进行反序列化,然后进行任务数据处理。总结一下,数据读的流程大致是OneInputStreamTask–>StreamInputProcessor–>CheckpointBarrierHandler–>InputGate–>InputChannel。
到此,Task的数据读基本就分析完了。后面会分析Reduce端和Map端的数据传输和交互过程,也就是RemoteInputChannel的数据是怎么接收的,ResultSubPartition里的数据又是怎么发送到下游的。