Flink Task线程处理模型:Mailbox
MailboxProcessor以单线程执行模型为核心,通过TaskMailbox统一管理数据处理与系统事件(如Checkpoint),结合优先级调度实现关键操作的即时抢占。其生命周期控制与协作式调度保障了任务稳定性与资源利用率。与StreamTask交互时,MailboxProcessor驱动事件处理循环,将异步事件封装为高优先级邮件,同时通过反压协调
Task的线程 和 MailboxProcessor 的绑定
executingThread 是 Task 类(StreamTask 的父类)在构造时创建的物理线程。MailboxProcessor 是 StreamTask 用来处理异步事件和驱动其主要处理逻辑(processInput)的核心组件。它们之间的绑定关系如下:
-
Task作为Runnable:Task类实现了Runnable接口,其run()方法是executingThread的入口点。// ... public class Task implements Runnable, TaskSlotPayload { // ... private final Thread executingThread; // 在构造函数中创建 public Task(/*...*/) { // ... this.executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); // this 指向 Task 实例 } @Override public void run() { // 这是 executingThread 执行的入口 try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) { doRun(); // 调用实际的工作方法 } finally { terminationFuture.complete(executionState); } } private void doRun() { // ... // 对于 StreamTask,这里会调用到 StreamTask 的 invoke() 方法 invokable.invoke(); // invokable 就是 StreamTask 实例 // ... } // ... }
StreamTask创建了TaskMailboxImpl,传递给MailboxProcessor,因此是MailboxProcessor的执行线程。
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor)
throws Exception {
this(
environment,
timerService,
uncaughtExceptionHandler,
actionExecutor,
new TaskMailboxImpl(Thread.currentThread()));
}
this.mailboxProcessor =
new MailboxProcessor(
this::processInput, mailbox, actionExecutor, mailboxMetricsControl);
StreamTask.invoke() 和 MailboxProcessor:
- 当
executingThread启动并执行到StreamTask.invoke()时,StreamTask会使用其内部的MailboxProcessor来驱动其核心事件循环。StreamTask.java
// ... @Override public final void invoke() throws Exception { // ... (初始化,如 restoreInternal()) ... // let the task do its work getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart(); runMailboxLoop(); // <--- 关键调用 // ... (清理工作,如 afterInvoke()) ... } public void runMailboxLoop() throws Exception { mailboxProcessor.runMailboxLoop(); // 将控制权交给 MailboxProcessor } // ... mailboxProcessor.runMailboxLoop()是一个阻塞调用(从executingThread的视角看)。这个方法会在executingThread上运行一个循环,不断地从邮箱 (Mailbox) 中取出邮件 (Mail) 并执行它们,或者在没有邮件时执行默认操作 (通常是StreamTask.processInput(),用于处理输入数据和调用算子)。
Mailbox 的线程模型:
MailboxProcessor被设计为在其“拥有者”线程(即executingThread)上执行其核心循环和邮件处理。TaskMailbox(被MailboxProcessor使用) 内部有检查,确保其关键方法(如take,put的某些变体,以及邮件的执行)是在预期的邮箱线程(即executingThread)上调用的。MailboxProcessor.java
public void runMailboxLoop() throws Exception { // ... final TaskMailbox localMailbox = mailbox; checkState( localMailbox.isMailboxThread(), // 确保当前线程是邮箱线程 "Method must be executed by declared mailbox thread!"); // ... while (isNextLoopPossible()) { processMail(localMailbox, false); // 处理邮件,在 executingThread 上执行 if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction(mailboxController); // 执行默认动作,在 executingThread 上执行 } } }MailboxDefaultAction通常包装了StreamTask.processInput(),所以数据处理和算子调用也是在executingThread上发生的。- 其他线程(例如网络线程接收到数据后,或者定时器线程触发定时器)想要与
StreamTask交互时,它们不会直接调用StreamTask的方法,而是向其Mailbox中放入一个“邮件”(一个Runnable或Callable)。MailboxProcessor会在executingThread上从邮箱中取出这个邮件并执行它。
总结线程与 Mailbox 的绑定:
Task构造时创建executingThread,并将Task自身作为Runnable传递给该线程。executingThread启动后,执行Task.run()->Task.doRun()->StreamTask.invoke()。- 在
StreamTask.invoke()中,调用mailboxProcessor.runMailboxLoop()。 mailboxProcessor.runMailboxLoop()在executingThread上运行,它负责从邮箱中拉取任务并执行,或者执行默认的数据处理逻辑 (processInput)。- 所有提交到该
StreamTask邮箱的异步操作最终都会在executingThread上被MailboxProcessor串行化执行。
因此,executingThread 成为了 MailboxProcessor 的“工作线程”。MailboxProcessor 确保了 StreamTask 的核心逻辑(包括状态访问、算子调用等)都在这个单一的 executingThread 上顺序执行,从而简化了并发控制。
MailboxProcessor的功能
MailboxProcessor 是 Flink 中任务(Task)执行模型的核心组件,它实现了基于邮箱(Mailbox)的单线程执行模式。其主要能力包括:
管理邮箱 (TaskMailbox):
- 持有一个
TaskMailbox实例,用于存储需要串行执行的各种动作(Mail)。这些动作可以是来自外部的请求(如 Checkpoint 触发、Timer 回调)或内部控制命令。
// ... existing code ...
public class MailboxProcessor implements Closeable {
// ... existing code ...
/**
* The mailbox data-structure that manages request for special actions, like timers,
* checkpoints, ...
*/
protected final TaskMailbox mailbox;
// ... existing code ...
执行默认动作 (MailboxDefaultAction):
- 在邮箱为空时,会循环执行一个预定义的“默认动作”。在
StreamTask的上下文中,这个默认动作通常是处理输入数据(processInput)。
this.mailboxProcessor =
new MailboxProcessor(
this::processInput, mailbox, actionExecutor, mailboxMetricsControl);
// ... existing code ...
/**
* Action that is repeatedly executed if no action request is in the mailbox. Typically record
* processing.
*/
protected final MailboxDefaultAction mailboxDefaultAction;
// ... existing code ...
单线程执行循环 (runMailboxLoop):
- 核心方法
runMailboxLoop()驱动整个执行逻辑。它会不断检查邮箱中是否有新的Mail,如果有则执行它们;如果没有,则执行默认动作。 - 这种机制保证了默认动作(如数据处理)和邮箱中的其他动作(如 Checkpoint、Timer 事件)之间是单线程顺序执行的,避免了并发冲突。
// ... existing code ...
public void runMailboxLoop() throws Exception {
suspended = !mailboxLoopRunning;
final TaskMailbox localMailbox = mailbox;
checkState(
localMailbox.isMailboxThread(),
"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController mailboxController = new MailboxController(this);
while (isNextLoopPossible()) {
// The blocking `processMail` call will not return until default action is available.
processMail(localMailbox, false);
if (isNextLoopPossible()) {
mailboxDefaultAction.runDefaultAction(
mailboxController); // lock is acquired inside default action as needed
}
}
}
// ... existing code ...
提供邮箱执行器 (MailboxExecutor):
- 通过
getMainMailboxExecutor()和getMailboxExecutor(int priority)方法,向外部提供MailboxExecutor。这使得其他组件(如 TimerService、CheckpointCoordinator)可以将它们的动作提交到邮箱中,由MailboxProcessor在其单线程循环中统一调度执行。
// ... existing code ...
public MailboxExecutor getMainMailboxExecutor() {
return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);
}
/**
* Returns an executor service facade to submit actions to the mailbox.
*
* @param priority the priority of the {@link MailboxExecutor}.
*/
public MailboxExecutor getMailboxExecutor(int priority) {
return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
}
// ... existing code ...
生命周期管理:
- 实现了
Closeable接口,并有prepareClose()和close()方法,对应TaskMailbox的quiesce()和close()。这确保了在任务结束时,邮箱能被正确关闭,并处理(如取消)剩余的Mail。
// ... existing code ...
/** Lifecycle method to close the mailbox for action submission. */
public void prepareClose() {
mailbox.quiesce();
}
/**
* Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all
* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the
* mailbox.
*/
@Override
public void close() {
List<Mail> droppedMails = mailbox.close();
// ... existing code ...
}
挂起与恢复:
MailboxProcessor的执行循环可以被挂起 (suspend()) 和恢复(通过再次调用runMailboxLoop()或相关控制逻辑)。默认动作也可以通过MailboxController暂时挂起。
// ... existing code ...
/** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */
public void suspend() {
sendPoisonMail(() -> suspended = true);
}
// ... existing code ...
以及 MailboxController 中的 suspendDefaultAction()。
异常处理:
reportThrowable(Throwable throwable)方法允许将其他线程中发生的异常报告给邮箱线程,并在邮箱线程中重新抛出,从而中断任务执行。
// ... existing code ...
public void reportThrowable(Throwable throwable) {
sendControlMail(
() -> {
if (throwable instanceof Exception) {
throw (Exception) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw WrappingRuntimeException.wrapIfNecessary(throwable);
}
},
"Report throwable %s",
throwable);
}
// ... existing code ...
度量指标控制 (MailboxMetricsController):
- 包含一个
MailboxMetricsController用于控制和访问邮箱相关的度量指标,如邮箱延迟、处理的邮件数量等。
// ... existing code ...
private final MailboxMetricsController mailboxMetricsControl;
// ... existing code ...
@VisibleForTesting
public MailboxMetricsController getMailboxMetricsControl() {
return this.mailboxMetricsControl;
}
// ... existing code ...
MailboxProcessor 与 StreamTask 的互动
MailboxProcessor 为 StreamTask 提供了一个强大的、基于邮箱的单线程执行引擎。StreamTask 委托 MailboxProcessor 来驱动其核心的数据处理循环,并通过 MailboxExecutor 将所有需要与任务主线程同步的异步操作(如 Timer、Checkpoint 事件)统一提交到邮箱中进行调度。这种设计确保了任务内部操作的串行化,简化了并发控制,并提高了系统的稳定性和可维护性。
StreamTask 是 Flink 流处理任务的基类,它使用 MailboxProcessor 来管理其核心执行逻辑。
-
创建和持有
MailboxProcessor:StreamTask在其构造函数中创建并持有一个MailboxProcessor实例。MailboxDefaultAction通常被设置为StreamTask::processInput,这意味着当邮箱为空时,StreamTask会执行其数据处理逻辑。StreamTaskActionExecutor也被传递给MailboxProcessor。
-
驱动执行循环:
StreamTask的invoke()方法是任务的执行入口。在其核心逻辑中,它会调用mailboxProcessor.runMailboxLoop()来启动邮箱处理循环。这个循环会一直运行,直到任务完成或被取消。- 代码见
StreamTask.invoke():StreamTask.java
// ... existing code ... public final void invoke() throws Exception { // ... initialization ... try { // ... // Run mailbox until all gates will be recovered. mailboxProcessor.runMailboxLoop(); // 启动邮箱循环 // ... } finally { // ... cleanup ... // let mailbox execution reject all new letters from this point mailboxProcessor.prepareClose(); // ... mailboxProcessor.close(); } } // ... existing code ...
-
提交异步动作:
StreamTask及其相关的组件(如TimerService、SubtaskCheckpointCoordinator)需要执行一些异步操作,例如触发 Timer、执行 Checkpoint、响应外部事件等。这些操作需要确保在任务的主线程中执行,以避免并发问题。StreamTask通过从mailboxProcessor获取的MailboxExecutor来提交这些异步操作。这些操作会被封装成Mail放入邮箱,由MailboxProcessor在其循环中按顺序执行。- 例如,
ProcessingTimeService的实现会使用MailboxExecutor来调度 Timer 的触发:StreamOperatorFactoryUtil.java
// ... existing code ... public static <OUT, OP extends StreamOperator<OUT>> OP createOperator( // ... MailboxExecutor mailboxExecutor = // Obtained via containingTask.getMailboxExecutorFactory() containingTask .getMailboxExecutorFactory() .createExecutor(configuration.getChainIndex()); // ... final ProcessingTimeService processingTimeService; if (operatorFactory instanceof ProcessingTimeServiceAware) { processingTimeService = ((ProcessingTimeServiceAware) operatorFactory) .createProcessingTimeService(mailboxExecutor); } else { processingTimeService = processingTimeServiceFactory.get(); } // ... existing code ...ProcessingTimeServiceImpl内部会使用这个mailboxExecutor来execute或schedule定时任务。
-
控制流程与状态:
StreamTask的processInput方法(作为MailboxDefaultAction)可以通过MailboxDefaultAction.Controller与MailboxProcessor交互。例如,当输入数据处理完毕或遇到反压时,它可以调用controller.suspendDefaultAction()来暂时挂起默认动作的执行,让MailboxProcessor优先处理邮箱中的其他Mail。- 代码见
StreamTask.processInput():// ... existing code ... protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { DataInputStatus status = inputProcessor.processInput(); switch (status) { // ... case END_OF_INPUT: // Suspend the mailbox processor, it would be resumed in afterInvoke and finished // after all records processed by the downstream tasks. We also suspend the default // actions to avoid repeat executing the empty default operation (namely process // records). controller.suspendDefaultAction(); // 通过 Controller 控制 MailboxProcessor mailboxProcessor.suspend(); return; } // ... } // ... existing code ...
-
生命周期同步:
StreamTask在其生命周期的不同阶段(如cancelTask,afterInvoke)会调用MailboxProcessor的相应方法(如prepareClose,close,allActionsCompleted)来同步状态和清理资源。- 例如,在任务正常结束或需要最终 Checkpoint 完成后,会调用
mailboxProcessor.allActionsCompleted():StreamTask.java
// ... existing code ... FutureUtils.waitForAll(terminationConditions) .thenRun(mailboxProcessor::allActionsCompleted); // Resumes the mailbox processor. The mailbox processor would be completed // after all records are processed by the downstream tasks. mailboxProcessor.runMailboxLoop(); // ... existing code ...
TaskMailboxImpl
虽然这个类的核心结构是“一个锁(ReentrantLock)加一个队列(Deque<Mail>)”,但它的实现中包含了一些针对 Flink Task 执行模型的特定优化和设计,使其不仅仅是一个简单的线程安全队列。
@ThreadSafe
public class TaskMailboxImpl implements TaskMailbox {
/** Lock for all concurrent ops. */
private final ReentrantLock lock = new ReentrantLock();
/** Internal queue of mails. */
@GuardedBy("lock")
private final Deque<Mail> queue = new ArrayDeque<>();
/** Condition that is triggered when the mailbox is no longer empty. */
@GuardedBy("lock")
private final Condition notEmpty = lock.newCondition();
/** The state of the mailbox in the lifecycle of open, quiesced, and closed. */
@GuardedBy("lock")
private State state = OPEN;
/** Reference to the thread that executes the mailbox mails. */
@Nonnull private final Thread taskMailboxThread;
/**
* The current batch of mails. A new batch can be created with {@link #tryBuildBatch()} and
* consumed with {@link #tryTakeFromBatch()}.
*/
private final Deque<Mail> batch = new ArrayDeque<>();
/**
* Performance optimization where hasNewMail == !queue.isEmpty(). Will not reflect the state of
* {@link #batch}.
*/
private volatile boolean hasNewMail = false;
/**
* Performance optimization where there is new urgent mail. When there is no urgent mail in the
* batch, it should be checked every time mail is taken, including taking mail from batch queue.
*/
private volatile boolean hasNewUrgentMail = false;
public TaskMailboxImpl(@Nonnull final Thread taskMailboxThread) {
this.taskMailboxThread = taskMailboxThread;
}
@VisibleForTesting
public TaskMailboxImpl() {
this(Thread.currentThread());
}
核心结构:
-
lock: ReentrantLock: 这是实现线程安全的核心。所有对内部队列queue和状态state的并发访问都由此锁保护。 -
queue: Deque<Mail>:- 实际存储待处理
Mail对象的双端队列。Mail对象封装了需要执行的动作(通常是一个Runnable)及其优先级。 - 使用
ArrayDeque作为底层实现。
- 实际存储待处理
-
notEmpty: Condition:- 与
lock关联的条件变量。当邮箱从空变为非空时(即有新的Mail被放入),会通过notEmpty.signal()或notEmpty.signalAll()来唤醒可能正在等待获取Mail的线程(主要是邮箱处理线程)。 - 在
take()方法中,如果队列为空,线程会调用notEmpty.await()等待。
- 与
-
state: State(enum:OPEN,QUIESCED,CLOSED):- 表示邮箱的生命周期状态:
OPEN: 邮箱正常工作,可以接收和发送邮件。QUIESCED: 邮箱处于静默状态,不再接受新的邮件(put操作会失败),但仍然可以取出已有的邮件。通常在任务准备关闭时进入此状态。CLOSED: 邮箱已关闭,不能进行任何操作。所有未处理的邮件会被清空。
- 状态转换由
quiesce()和close()方法控制,并且这些操作也受lock保护。
- 表示邮箱的生命周期状态:
-
taskMailboxThread: Thread:- 一个非常重要的字段,它存储了被指定为“邮箱线程”的线程引用。
- 很多操作(如
take,tryTake,hasMail,createBatch,tryTakeFromBatch,quiesce,close)都强制要求调用者必须是这个taskMailboxThread,通过checkIsMailboxThread()进行检查。这是因为 Flink 的 Task 执行模型是单线程的,MailboxProcessor会在其专用的线程中处理邮箱中的邮件和默认动作。
-
batch: Deque<Mail>:- 这是一个性能优化的设计。
MailboxProcessor在其主循环中,会先调用createBatch()将主队列queue中的所有邮件一次性转移到这个batch队列中。然后,MailboxProcessor会优先从batch中通过tryTakeFromBatch()获取邮件进行处理。 - 目的: 减少锁的竞争。
createBatch()在持有锁的情况下将一批邮件转移出来,之后MailboxProcessor处理batch中的邮件时就不再需要频繁获取锁去访问主队列queue。这对于高吞吐量的场景非常重要。 batch的操作也仅限于taskMailboxThread。
- 这是一个性能优化的设计。
-
hasNewMail: volatile boolean:- 这是另一个性能优化。它大致反映了主队列
queue是否为空 (!queue.isEmpty())。 volatile关键字确保了不同线程对它的可见性。- 目的: 允许邮箱线程在不获取锁的情况下快速检查是否有新邮件。例如,在
hasMail()和tryTake()方法中,会先检查batch,然后检查hasNewMail,只有当hasNewMail为true时,才尝试获取锁并检查主队列queue。 - 当有新邮件通过
put()或putFirst()(从非邮箱线程调用时)添加到queue时,hasNewMail会被设置为true。当邮件从queue中被取出或通过createBatch()转移到batch时,hasNewMail会被更新。
- 这是另一个性能优化。它大致反映了主队列
特别需要注意的点:
-
单消费者(邮箱线程)设计:
- 尽管
put()和putFirst()方法允许从任何线程添加邮件(是线程安全的),但所有取邮件的操作(take,tryTake,createBatch,tryTakeFromBatch)以及生命周期管理方法(quiesce,close)都必须由taskMailboxThread调用。这是 Flink Mailbox 模型的核心设计,确保了任务逻辑的单线程执行。
- 尽管
-
批处理优化 (
batch队列):- 理解
batch队列的作用对于分析性能至关重要。它不是一个独立的邮箱,而是主队列queue的一个临时缓存,用于减少锁争用。MailboxProcessor会周期性地将queue中的内容“批发”到batch中。
- 理解
-
hasNewMail优化:hasNewMail变量提供了一种轻量级的检查机制,避免了邮箱线程在主队列可能为空时仍频繁获取锁。 -
优先级处理 (
takeOrNull方法):takeOrNull(Deque<Mail> queue, int priority)方法实现了从队列中根据优先级取出邮件的逻辑。它会遍历队列,找到第一个优先级大于或等于指定priority的邮件并返回。这意味着高优先级的邮件(如控制命令、Checkpoint barrier)可以被优先处理。
-
putFirst()的特殊行为:putFirst(@Nonnull Mail mail)方法很有意思:- 如果调用者是
taskMailboxThread,邮件会直接被添加到batch队列的头部。这是因为邮箱线程是当前批次邮件的消费者,将邮件直接放入批处理队列的头部可以使其被更快处理,而无需等待下一轮createBatch。 - 如果调用者不是
taskMailboxThread,邮件会被添加到主队列queue的头部,并通过notEmpty.signal()唤醒邮箱线程。
- 如果调用者是
-
生命周期管理 (
state,quiesce(),close()):- 邮箱的生命周期状态转换是严格控制的,并且与任务的生命周期紧密相关。
quiesce(): 使邮箱不再接受新邮件,但允许处理完已有的邮件。close(): 彻底关闭邮箱,清空所有邮件,并唤醒所有可能在等待的线程(通过notEmpty.signalAll()),通常是为了让它们感知到关闭状态并退出。
-
锁的粒度和使用:
ReentrantLock用于保护对共享数据(queue,state)的访问。Condition(notEmpty) 用于实现生产者-消费者模式中的等待和通知机制。lock.lockInterruptibly()在take()方法中使用,允许等待的邮箱线程响应中断。
-
runExclusively(Runnable runnable):- 提供了一种机制,允许以独占方式在邮箱的锁保护下执行一段代码。这对于需要原子地执行多个邮箱操作(例如,检查状态然后根据状态放入邮件)的场景非常有用,可以避免竞态条件。
总而言之,TaskMailboxImpl 虽然基于简单的锁和队列,但通过引入批处理、hasNewMail 标志、严格的线程模型以及精细的生命周期管理,为 Flink 的 MailboxProcessor 提供了一个高效且功能完备的邮件调度机制。这些设计都是为了在保证单线程执行模型的前提下,最大化吞吐量并减少不必要的同步开销。
Mail 类分析
Mail 类是 Apache Flink 流处理运行时任务邮箱机制中的一个核心组件。它代表一个可执行的任务单元,绑定到特定的操作符链中,可以被下游邮箱处理器选择执行。
主要属性
mailOptions: 邮件选项,用于配置邮件的行为,如是否可延迟执行。runnable: 要执行的操作,是一个ThrowingRunnable类型的实例,可以抛出异常。priority: 邮件的优先级。优先级并不直接决定执行顺序,而是用于避免上下游操作符之间的活锁或死锁问题。descriptionFormat和descriptionArgs: 用于调试和错误报告的邮件描述信息。actionExecutor: 用于执行runnable的执行器。
Mail 类提供了三个构造函数,允许灵活地创建邮件对象:
- 最简单的构造函数只需要
runnable、priority和描述信息。 - 可以指定
MailboxExecutor.MailOptions来配置邮件选项。 - 可以指定
StreamTaskActionExecutor来控制操作的执行方式。
核心方法
getMailOptions(): 获取邮件选项。getPriority(): 获取邮件的优先级。如果邮件是可延迟的,则返回最小优先级。tryCancel(): 尝试取消邮件的执行。toString(): 返回邮件的描述信息。run(): 执行邮件中的操作。
Mail 类在 Flink 的流处理任务中扮演着重要角色。它允许将任务分解为小的、可执行的单元,并通过邮箱机制进行调度和执行。这种设计有助于提高任务的并发性和响应性,同时避免复杂的同步问题。
在实际使用中,可以通过创建 Mail 对象来封装需要执行的操作,并将其提交到邮箱中等待执行。通过设置不同的优先级和选项,可以控制操作的执行顺序和行为。
MailboxExecutorImpl
MailboxExecutorImpl 实现了 flink.api.common.operators.MailboxExecutor 接口,它充当了向 Flink Task 的邮箱(TaskMailbox)提交执行单元(Runnable 或 Callable)的一个入口或门面。它的核心目标是允许其他组件将代码片段(封装为 Mail 对象)放入邮箱,这些代码片段最终会由 MailboxProcessor 在其专用的单线程中执行。
核心成员变量:
mailbox: TaskMailbox: 这是实际存储待执行邮件的邮箱实例。MailboxExecutorImpl将通过它来提交新的邮件。MailboxExecutorImpl.java
// ... existing code ... /** The mailbox that manages the submitted runnable objects. */ @Nonnull private final TaskMailbox mailbox; // ... existing code ...priority: int: 与此MailboxExecutorImpl实例关联的邮件的默认优先级。当通过这个执行器提交任务时,任务会带上这个优先级。// ... existing code ... private final int priority; // ... existing code ...actionExecutor: StreamTaskActionExecutor: 这是一个执行器,用于实际运行封装在Mail对象中的命令。Mail对象在被MailboxProcessor取出后,其run()方法会使用这个actionExecutor来执行具体的逻辑。// ... existing code ... private final StreamTaskActionExecutor actionExecutor; // ... existing code ...mailboxProcessor: MailboxProcessor(可能为null): 指向驱动邮箱循环的MailboxProcessor。主要用于isIdle()方法的判断。// ... existing code ... private final MailboxProcessor mailboxProcessor; // ... existing code ...
构造函数:
- 提供了两个构造函数,主要的区别在于是否传入
MailboxProcessor。
它们初始化了执行器的核心组件。// ... existing code ... public MailboxExecutorImpl( @Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) { this(mailbox, priority, actionExecutor, null); } public MailboxExecutorImpl( @Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) { this.mailbox = mailbox; this.priority = priority; this.actionExecutor = Preconditions.checkNotNull(actionExecutor); this.mailboxProcessor = mailboxProcessor; } // ... existing code ...
主要方法分析:
execute
// ... existing code ...
@Override
public void execute(
MailOptions mailOptions,
final ThrowingRunnable<? extends Exception> command,
final String descriptionFormat,
final Object... descriptionArgs) {
try {
mailbox.put(
new Mail(
mailOptions,
command,
priority,
actionExecutor,
descriptionFormat,
descriptionArgs));
} catch (MailboxClosedException mbex) {
throw new RejectedExecutionException(mbex);
}
}
// ... existing code ...
- 这是向邮箱提交任务的核心方法。
- 它接收一个
ThrowingRunnable作为要执行的命令,以及MailOptions(用于配置邮件行为,例如是否可延迟)、描述信息等。 - 内部会创建一个新的
Mail对象,该对象封装了传入的command、此执行器实例的priority、actionExecutor以及描述信息。 - 然后调用
mailbox.put(new Mail(...))将这个新创建的Mail对象放入TaskMailbox中。 - 如果邮箱已经关闭(
MailboxClosedException),则会抛出RejectedExecutionException,这是 Executor 服务在无法接受新任务时的标准行为。
yield
此方法设计为由邮箱线程自身调用。
- 它尝试从
mailbox中获取一个至少具有 此执行器priority的邮件 (mailbox.take(priority))。这是一个阻塞操作,如果当前没有符合条件的邮件,它会等待。 - 一旦获取到
Mail对象,它会立即在当前线程(即邮箱线程)中执行mail.run()。 - 目的: 允许当前正在邮箱线程中执行的某个可能耗时较长的操作(例如用户函数)主动暂停,让邮箱中其他待处理的邮件(特别是具有相同或更高优先级的邮件,如 Checkpoint Barrier)有机会执行。这是一种协作式多任务处理机制,对于保证邮箱系统的响应性至关重要。
@Override
public void yield() throws InterruptedException {
Mail mail = mailbox.take(priority);
try {
mail.run();
} catch (Exception ex) {
throw WrappingRuntimeException.wrapIfNecessary(ex);
}
}
-
tryYield():- 与
yield()类似,但是一个非阻塞版本。 - 它调用
mailbox.tryTake(priority)尝试获取邮件。 - 如果成功获取到邮件,则执行它并返回
true。 - 如果没有符合条件的邮件,则立即返回
false,不会阻塞。 - 需要注意的是,根据
MailboxExecutor接口的约定和MailOptions.deferrable()的设计,yield()和tryYield()通常不会执行被标记为 "deferrable"(可延迟)的邮件。这是为了在需要快速让出执行权(例如为了处理 Checkpoint)时,避免执行那些可以稍后处理的低优先级或非紧急任务。
- 与
-
shouldInterrupt():- 此方法用于指示当前正在邮箱线程中执行的操作是否应该被中断(例如,一个长时间运行的用户函数)。
- 目前的实现是简单地检查
mailbox.hasMail(),即只要邮箱中还有任何待处理的邮件,就建议中断。 - 代码中的
TODO: FLINK-35051注释表明,这是一个待优化的点。理想情况下,只有当邮箱中有时间敏感的邮件(例如与 Checkpoint 相关的邮件)时,才应该建议中断,以避免不必要的性能开销。
-
isIdle():// ... existing code ... public boolean isIdle() { return !mailboxProcessor.isDefaultActionAvailable() && !mailbox.hasMail() && mailbox.getState().isAcceptingMails(); } // ... existing code ...- 检查关联的
MailboxProcessor是否处于空闲状态。 - 判断条件为:
MailboxProcessor的默认操作(通常是processInput)当前不可用(即被挂起)。TaskMailbox中没有待处理的邮件。TaskMailbox的状态仍然是接受邮件的状态(即不是QUIESCED或CLOSED)。
- 这个方法需要
mailboxProcessor成员不为null。
- 检查关联的
总结与作用
MailboxExecutorImpl 为 Flink 的异步操作和事件驱动模型提供了一个关键的接口。它使得系统中的不同部分(例如 Timer Service、Checkpoint Coordinator,甚至是算子自身)能够安全地将需要在 Task 主执行线程(即邮箱线程)中执行的逻辑提交到邮箱队列。
- 封装提交逻辑: 它将创建
Mail对象并将其放入TaskMailbox的细节封装起来,提供了一个更简洁的Executor风格的 API。 - 支持优先级: 允许为通过特定执行器实例提交的任务指定一个默认优先级。
- 协作式调度 (
yield/tryYield): 这是 Mailbox 模型单线程执行模式下实现并发感和响应性的核心机制。它允许长时间运行的任务主动让出控制权,确保高优先级任务(如系统事件)能够及时处理。 - 中断提示 (
shouldInterrupt): 为长时间运行的用户代码提供了一个检查点,以便在需要时(例如为了执行 Checkpoint)能够优雅地中断。
通过 MailboxExecutorImpl,Flink 能够确保所有关键的 Task 级别操作(数据处理、状态访问、Checkpoint、Timer 回调等)都在同一个线程中有序执行,从而避免了复杂的并发控制问题,简化了状态管理和一致性保证。
MailboxProcessor 细节分析
MailboxProcessor 封装了基于 Mailbox 的执行模型的完整逻辑。它的核心是一个事件循环 (runMailboxLoop),该循环持续执行两个主要任务:
- 处理邮箱中的邮件 (Mail): 检查
TaskMailbox中是否有待处理的邮件(例如 Checkpoint 触发、Timer 事件、用户通过MailboxExecutor提交的自定义逻辑等),并按优先级顺序执行它们。 - 执行默认动作 (MailboxDefaultAction): 如果邮箱中没有邮件,或者邮件处理完毕后,它会执行一个“默认动作”。在
StreamTask的上下文中,这个默认动作通常是processInput(),即处理来自上游的数据。
这种设计确保了 Task 内部所有操作(数据处理、Checkpoint、Timer 等)的单线程执行,从而极大地简化了并发控制和状态管理。
主要结构组件:
-
mailbox: TaskMailbox: 这是实际存储和管理Mail对象的组件。MailboxProcessor从它那里获取邮件。 -
mailboxDefaultAction: MailboxDefaultAction: 代表在邮箱空闲时重复执行的默认操作。它通过MailboxDefaultAction.Controller与MailboxProcessor交互,例如在没有输入数据时通知MailboxProcessor暂停调用默认动作。 -
actionExecutor: StreamTaskActionExecutor: 用于实际执行Mail中封装的Runnable。Mail对象本身不直接执行逻辑,而是委托给这个执行器。 -
控制标志 (Control Flags) - 这些标志必须只能从邮箱线程访问,以避免竞态条件:
mailboxLoopRunning: boolean: 控制主事件循环是否应该继续运行。当设置为false时,循环会在当前迭代完成后终止。suspended: boolean: 控制邮箱处理器是否被临时挂起。如果为true,runMailboxLoop会退出,但之后可以被重新调用以恢复。suspendedDefaultAction: DefaultActionSuspension: 记录当前默认动作是否被挂起。如果非null,表示默认动作已挂起,MailboxProcessor不会调用它。
// ... existing code ... /** * Control flag to terminate the mailbox processor. Once it was terminated could not be * restarted again. Must only be accessed from mailbox thread. */ private boolean mailboxLoopRunning; /** * Control flag to temporary suspend the mailbox loop/processor. After suspending the mailbox * processor can be still later resumed. Must only be accessed from mailbox thread. */ private boolean suspended; /** * Remembers a currently active suspension of the default action. Serves as flag to indicate a * suspended default action (suspended if not-null) and to reuse the object as return value in * consecutive suspend attempts. Must only be accessed from mailbox thread. */ private DefaultActionSuspension suspendedDefaultAction; // ... existing code ... -
mailboxMetricsControl: MailboxMetricsController: 用于管理和暴露与邮箱相关的度量指标。
MailboxProcessor 提供了多个构造函数,允许不同程度的定制。核心的构造函数接收 MailboxDefaultAction、TaskMailbox、StreamTaskActionExecutor 和 MailboxMetricsController。
一个常见的用法是传入一个 MailboxDefaultAction,然后 MailboxProcessor 会使用默认的 TaskMailboxImpl(与当前线程绑定)和 StreamTaskActionExecutor.IMMEDIATE。
// ... existing code ...
public MailboxProcessor(
MailboxDefaultAction mailboxDefaultAction,
TaskMailbox mailbox,
StreamTaskActionExecutor actionExecutor,
MailboxMetricsController mailboxMetricsControl) {
this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
this.mailbox = Preconditions.checkNotNull(mailbox);
this.mailboxLoopRunning = true;
this.suspendedDefaultAction = null;
this.mailboxMetricsControl = mailboxMetricsControl;
}
// ... existing code ...
runMailboxLoop()
// ... existing code ...
public void runMailboxLoop() throws Exception {
suspended = !mailboxLoopRunning;
final TaskMailbox localMailbox = mailbox;
checkState(
localMailbox.isMailboxThread(),
"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController mailboxController = new MailboxController(this);
while (isNextLoopPossible()) {
// The blocking `processMail` call will not return until default action is available.
processMail(localMailbox, false);
if (isNextLoopPossible()) {
mailboxDefaultAction.runDefaultAction(
mailboxController); // lock is acquired inside default action as needed
}
}
}
private boolean isNextLoopPossible() {
// 'Suspended' can be false only when 'mailboxLoopRunning' is true.
return !suspended;
}
// ... existing code ...
- 这是
MailboxProcessor的心脏。它在一个while (isNextLoopPossible())循环中运行。 - 前置检查: 确保该方法由指定的邮箱线程执行,并且邮箱处于
OPEN状态。 - 创建
MailboxController:MailboxController是MailboxDefaultAction与MailboxProcessor交互的桥梁。 - 循环体:
processMail(localMailbox, false): 调用此方法处理邮箱中的邮件。这是一个关键步骤,它会尝试非阻塞地处理一批邮件。如果默认操作被挂起,它可能会阻塞地等待邮件或默认操作变为可用。false表示不是单步执行。if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction(mailboxController); }: 如果循环仍然可以继续(例如,没有被挂起或关闭),并且默认动作是可用的,则执行默认动作。
- 设计理念: 注释中提到,
runMailboxLoop的设计目标是保持热路径(默认动作,邮箱中没有邮件)尽可能快。因此,对控制标志(如mailboxLoopRunning,suspendedDefaultAction)的检查通常与mailbox.hasMail()为true相关联。这意味着,如果要在邮箱线程内部更改这些标志,必须确保邮箱中至少有一个邮件,以便更改能被及时感知。
processMail
// ... existing code ...
private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {
// Doing this check is an optimization to only have a volatile read in the expected hot
// path, locks are only
// acquired after this point.
boolean isBatchAvailable = mailbox.createBatch();
// Take mails in a non-blockingly and execute them.
boolean processed = isBatchAvailable && processMailsNonBlocking(singleStep);
if (singleStep) {
return processed;
}
// If the default action is currently not available, we can run a blocking mailbox execution
// until the default action becomes available again.
processed |= processMailsWhenDefaultActionUnavailable();
return processed;
}
private boolean processMailsNonBlocking(boolean singleStep) throws Exception {
long processedMails = 0;
Optional<Mail> maybeMail;
while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {
if (processedMails++ == 0) {
maybePauseIdleTimer();
}
runMail(maybeMail.get());
if (singleStep) {
break;
}
}
if (processedMails > 0) {
maybeRestartIdleTimer();
return true;
} else {
return false;
}
}
// ... existing code ...
其中 processMailsNonBlocking 和 processMailsWhenDefaultActionUnavailable 内部会调用 runMail(Mail mail) 来实际执行邮件:
// ... existing code ...
private void runMail(Mail mail) throws Exception {
mailboxMetricsControl.getMailCounter().inc();
mail.run();
// ... existing code ...
- 此方法负责处理邮箱中的邮件。
mailbox.createBatch(): 首先尝试从主队列创建一批邮件到TaskMailbox的内部批处理队列。这是一个优化,减少锁竞争。processMailsNonBlocking(singleStep): 非阻塞地处理批处理队列中的邮件。如果singleStep为true,则只处理一个邮件(用于测试或调试)。processMailsWhenDefaultActionUnavailable(): 如果默认动作当前不可用(例如,由于反压或没有输入),此方法会尝试从邮箱中获取并处理邮件。它可能会阻塞地等待新邮件的到来,直到默认动作再次可用或循环终止。- 返回
true如果至少处理了一封邮件。
suspend()
// ... existing code ...
/** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */
public void suspend() {
sendPoisonMail(() -> suspended = true);
}
/** Send mail in first priority for internal needs. */
private void sendPoisonMail(RunnableWithException mail) {
mailbox.runExclusively(
() -> {
// keep state check and poison mail enqueuing atomic, such that no intermediate
// #close may cause a
// MailboxStateException in #sendPriorityMail.
if (mailbox.getState() == TaskMailbox.State.OPEN) {
sendControlMail(mail, "poison mail");
}
});
public void runExclusively(Runnable runnable) {
lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}
// ... existing code ...
- 用于从外部(非邮箱线程)请求挂起邮箱循环。
- 它通过
sendPoisonMail()向邮箱头部插入一个高优先级的“毒丸”邮件。当这个邮件被处理时,它会将suspended标志设置为true,从而导致runMailboxLoop在下一次检查isNextLoopPossible()时退出。 - Poison Mail: 是一种特殊控制邮件,用于改变
MailboxProcessor的内部状态。
allActionsCompleted()
// ... existing code ...
/**
* This method must be called to end the stream task when all actions for the tasks have been
* performed.
*/
public void allActionsCompleted() {
sendPoisonMail(
() -> {
mailboxLoopRunning = false;
suspended = true;
});
}
// ... existing code ...
- 当 Task 的所有动作都已完成,需要终止邮箱循环时调用此方法。
- 与
suspend()类似,它也通过sendPoisonMail()发送一个毒丸邮件。该邮件会将mailboxLoopRunning设置为false并将suspended设置为true,从而彻底停止事件循环。
sendPoisonMail 和 sendControlMail(...):
// ... existing code ...
/** Send mail in first priority for internal needs. */
private void sendPoisonMail(RunnableWithException mail) {
mailbox.runExclusively(
() -> {
// keep state check and poison mail enqueuing atomic, such that no intermediate
// #close may cause a
// MailboxStateException in #sendPriorityMail.
if (mailbox.getState() == TaskMailbox.State.OPEN) {
sendControlMail(mail, "poison mail");
}
});
}
/**
* Sends the given <code>mail</code> using {@link TaskMailbox#putFirst(Mail)} . Intended use is
* to control this <code>MailboxProcessor</code>; no interaction with tasks should be performed;
*/
private void sendControlMail(
RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {
mailbox.putFirst(
new Mail(
mail,
Integer.MAX_VALUE /*not used with putFirst*/,
descriptionFormat,
descriptionArgs));
}
// ... existing code ...
sendPoisonMail: 确保在邮箱OPEN状态下,通过sendControlMail发送一个控制邮件。它使用mailbox.runExclusively来原子地检查状态和入队。sendControlMail: 将一个具有最高优先级的Mail对象(通过mailbox.putFirst())放入邮箱。这些邮件用于内部控制,如挂起、终止、报告错误等。
生命周期方法 (prepareClose(), close()):
// ... existing code ...
/** Lifecycle method to close the mailbox for action submission. */
public void prepareClose() {
mailbox.quiesce();
}
/**
* Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all
* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the
* mailbox.
*/
@Override
public void close() {
List<Mail> droppedMails = mailbox.close();
// ... existing code ...
prepareClose(): 调用mailbox.quiesce()。这会使邮箱进入静默状态,不再接受新的邮件,但允许处理已有的邮件。这是关闭过程的第一步。close(): 调用mailbox.close()。这会彻底关闭邮箱,清空所有未处理的邮件,并尝试取消仍在邮箱中的RunnableFuture实例。
与 MailboxDefaultAction 的交互 (通过 MailboxController):
// ... existing code ...
protected static final class MailboxController implements MailboxDefaultAction.Controller {
private final MailboxProcessor mailboxProcessor;
protected MailboxController(MailboxProcessor mailboxProcessor) {
this.mailboxProcessor = mailboxProcessor;
}
@Override
public void allActionsCompleted() {
mailboxProcessor.allActionsCompleted();
}
@Override
public MailboxDefaultAction.Suspension suspendDefaultAction(
PeriodTimer suspensionPeriodTimer) {
return mailboxProcessor.suspendDefaultAction(suspensionPeriodTimer);
}
// ... existing code ...
}
// ... existing code ...
private final class DefaultActionSuspension implements MailboxDefaultAction.Suspension {
@Nullable private final PeriodTimer suspensionTimer;
public DefaultActionSuspension(@Nullable PeriodTimer suspensionTimer) {
this.suspensionTimer = suspensionTimer;
}
@Override
public void resume() {
if (mailbox.isMailboxThread()) {
resumeInternal();
} else {
try {
sendControlMail(this::resumeInternal, "resume default action");
} catch (MailboxClosedException ex) {
// Ignored
}
}
}
private void resumeInternal() {
// This method must be called from the mailbox thread.
if (mailboxProcessor.suspendedDefaultAction == this) {
mailboxProcessor.suspendedDefaultAction = null;
if (suspensionTimer != null) {
suspensionTimer.markEnd();
}
}
}
}
// ... existing code ...
MailboxController是一个内部类,实现了MailboxDefaultAction.Controller接口。MailboxDefaultAction通过这个Controller来与MailboxProcessor通信。suspendDefaultAction(): 当默认动作(如processInput)发现当前没有工作可做时(例如,没有输入数据或下游反压),它会调用controller.suspendDefaultAction()。MailboxProcessor.suspendDefaultAction(@Nullable PeriodTimer suspensionTimer):- 此方法(只能由邮箱线程调用)将
suspendedDefaultAction设置为一个新的DefaultActionSuspension实例。 DefaultActionSuspension实现了MailboxDefaultAction.Suspension接口,其resume()方法用于恢复默认动作的执行。resume()可以从任何线程调用,如果不是邮箱线程,它会发送一个控制邮件来确保恢复逻辑在邮箱线程中执行。
- 此方法(只能由邮箱线程调用)将
获取 MailboxExecutor:
// ... existing code ...
public MailboxExecutor getMainMailboxExecutor() {
return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);
}
/**
* Returns an executor service facade to submit actions to the mailbox.
*
* @param priority the priority of the {@link MailboxExecutor}.
*/
public MailboxExecutor getMailboxExecutor(int priority) {
return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
}
// ... existing code ...
getMainMailboxExecutor(): 返回一个具有最低优先级的MailboxExecutor。getMailboxExecutor(int priority): 返回一个指定优先级的MailboxExecutor。这些MailboxExecutor实例允许其他组件向此MailboxProcessor的邮箱提交任务。
总结
MailboxProcessor 是 Flink Task 单线程执行模型的核心。它通过一个事件循环来协调处理高优先级的控制/事件邮件和低优先级的默认数据处理动作。这种机制确保了:
- 单线程执行: 所有关键逻辑都在同一个线程中执行,避免了复杂的并发同步。
- 响应性: 高优先级邮件(如 Checkpoint barriers)可以抢占默认动作,保证系统事件的及时处理。
- 可控性: 提供了挂起、恢复、终止事件循环的机制。
- 可扩展性: 通过
MailboxExecutor允许外部组件向邮箱提交自定义任务。
processInput
processInput 方法是 StreamTask 执行其核心数据处理逻辑的地方。它是作为 MailboxProcessor 的默认动作 (MailboxDefaultAction) 来执行的。这意味着,当 MailboxProcessor 的邮箱中没有更高优先级的“邮件”(如 Checkpoint 触发、Timer 事件等)需要处理时,它就会循环调用这个 processInput 方法。
下面是对 processInput 方法的详细分析:
-
方法职责与设计理念:
-
处理输入事件: 其核心职责是从输入源(由
inputProcessor代表)获取一个事件(通常是一条记录或一组记录),并将其传递给后续的算子链进行处理。 -
非阻塞性: 注释中强调“Implementations should (in general) be non-blocking”。这是非常关键的一点。因为
MailboxProcessor是单线程执行其邮箱中的邮件和默认动作的,如果processInput长时间阻塞,将会导致 Checkpoint barriers、Timer 等重要事件无法及时处理,影响任务的正确性和性能。 -
与 MailboxProcessor 协作: 通过
MailboxDefaultAction.Controller controller参数,processInput可以与MailboxProcessor进行交互,例如在没有数据或遇到反压时,通知MailboxProcessor暂停调用默认动作。
-
-
处理输入 (
inputProcessor.processInput()):
- 方法首先调用
inputProcessor.processInput()。InputProcessor负责从上游读取数据、反序列化,并将数据喂给当前 Task 的第一个 Operator。 processInput()的返回值DataInputStatus描述了本次输入处理的结果。
根据 DataInputStatus 进行分支处理:
DataInputStatus status = inputProcessor.processInput();
switch (status) {
case MORE_AVAILABLE:
if (taskIsAvailable()) {
return;
}
break;
case NOTHING_AVAILABLE:
break;
case END_OF_RECOVERY:
throw new IllegalStateException("We should not receive this event here.");
case STOPPED:
endData(StopMode.NO_DRAIN);
return;
case END_OF_DATA:
endData(StopMode.DRAIN);
notifyEndOfData();
return;
case END_OF_INPUT:
// Suspend the mailbox processor, it would be resumed in afterInvoke and finished
// after all records processed by the downstream tasks. We also suspend the default
// actions to avoid repeat executing the empty default operation (namely process
// records).
controller.suspendDefaultAction();
mailboxProcessor.suspend();
return;
}
MORE_AVAILABLE: 表示 inputProcessor 中还有更多数据可以立即处理。
-
if (taskIsAvailable()) { return; }: 如果当前任务本身也是可用的(例如,下游没有反压),则直接返回。MailboxProcessor会很快再次调用processInput来处理更多数据。 -
NOTHING_AVAILABLE: 表示inputProcessor当前没有可用的数据。此时,方法不会立即返回,而是会继续检查是否存在反压等情况,可能需要暂停默认动作的执行。 -
END_OF_RECOVERY: 这是一个不期望在此处出现的状态,表示任务恢复逻辑可能存在问题,因此抛出IllegalStateException。 -
STOPPED: 表示输入流被强制停止(例如任务被取消,且不需要流干数据)。-
endData(StopMode.NO_DRAIN): 通知算子链以非排空模式结束处理。 -
return;: 结束当前processInput调用。
-
-
END_OF_DATA: 表示当前输入流的所有数据都已到达(例如,有限流Source结束)。-
endData(StopMode.DRAIN): 通知算子链以排空模式结束处理(处理完所有已缓冲的数据)。 -
notifyEndOfData(): 通知 TaskManager 当前任务的数据已结束。 -
return;: 结束当前processInput调用。
-
-
END_OF_INPUT: 表示该 Task 的所有输入都已经结束。这是一个更强的结束信号。-
controller.suspendDefaultAction(): 通知MailboxProcessor暂停调用processInput。因为已经没有新的输入了,再继续调用也没有意义。 -
mailboxProcessor.suspend(): 暂停整个MailboxProcessor的事件循环。任务此时会等待下游处理完所有数据,并完成最终的 Checkpoint 等操作。 -
return;: 结束当前processInput调用。
-
处理反压和等待逻辑 (当 NOTHING_AVAILABLE 或其他需要等待的情况): 如果 inputProcessor.processInput() 返回 NOTHING_AVAILABLE,或者虽然有数据但任务本身不可用(例如下游反压),代码会进入等待逻辑:
// 如果前面没有return
TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
PeriodTimer timer;
CompletableFuture<?> resumeFuture;
if (!recordWriter.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
resumeFuture = recordWriter.getAvailableFuture();
} else if (!inputProcessor.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
resumeFuture = inputProcessor.getAvailableFuture();
} else if (changelogWriterAvailabilityProvider != null
&& !changelogWriterAvailabilityProvider.isAvailable()) {
// waiting for changelog availability is reported as busy
timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
} else {
// data availability has changed in the meantime; retry immediately
return;
}
assertNoException(
resumeFuture.thenRun(
new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
-
TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();: 获取IO相关的度量指标组。 -
PeriodTimer timer; CompletableFuture<?> resumeFuture;: 声明计时器和用于恢复的 Future。 -
检查输出是否可用 (
!recordWriter.isAvailable()):-
如果
recordWriter(负责将处理结果写到下游)不可用,说明下游存在反压。 -
timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());: 启动一个计时器,用于度量由于下游反压导致的等待时间。 -
resumeFuture = recordWriter.getAvailableFuture();: 获取一个 Future,当recordWriter再次可用时,该 Future 会完成。
-
-
检查输入处理器是否可用 (
!inputProcessor.isAvailable()):-
如果
inputProcessor本身不可用(例如,等待网络缓冲区的到来)。 -
timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());: 启动一个计时器,用于度量由于上游输入不可用导致的空闲时间。 -
resumeFuture = inputProcessor.getAvailableFuture();: 获取一个 Future,当inputProcessor再次可用时,该 Future 会完成。
-
-
检查 Changelog Writer 是否可用:
-
如果使用了 Changelog State Backend,并且其
changelogWriterAvailabilityProvider表示不可用。 -
timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());: 启动计时器,度量等待 Changelog Writer 的繁忙时间。 -
resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();: 获取 Future,等待其可用。
-
-
数据可用性已改变 (
else { return; }):-
如果以上等待条件都不满足,说明在
inputProcessor.processInput()调用之后,数据的可用性可能已经发生了变化(例如,新的数据刚刚到达)。此时直接return,让MailboxProcessor立即重试processInput。
-
-
挂起默认动作并等待恢复:
-
controller.suspendDefaultAction(timer): 调用controller的suspendDefaultAction方法,并传入之前启动的timer。这会通知MailboxProcessor暂时停止调用processInput。MailboxProcessor会使用这个timer来记录挂起的时间(用于监控和度量)。该方法返回一个MailboxDefaultAction.Suspension对象。 -
resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer)): 当resumeFuture完成时(即等待的条件解除,例如下游不再反压或上游数据到达),会执行ResumeWrapper中的逻辑。ResumeWrapper会调用Suspension对象的resume()方法,这会通知MailboxProcessor可以重新开始调用processInput了。同时,timer也会被停止。 -
assertNoException(...): 确保thenRun中的操作不会抛出未捕获的异常。
-
Checkpoint
StreamTask 通过其内部的 MailboxProcessor 和相关的 MailboxExecutor 来发送和处理与 Checkpoint 相关的邮件(即需要在 Task 主线程中执行的 Checkpoint 操作)。
以下是 StreamTask 如何发送和处理与 Checkpoint 相关邮件的关键机制分析:
-
MailboxProcessor和MailboxExecutor:- 每个
StreamTask都有一个MailboxProcessor实例 (mailboxProcessor),它负责驱动 Task 的事件循环。 StreamTask可以通过mailboxProcessor.getMailboxExecutor(priority)获取一个MailboxExecutor。这个MailboxExecutor提供了execute(...)方法,可以将一个Runnable(封装了 Checkpoint 相关逻辑)作为Mail提交到邮箱中。- 这些邮件会被
MailboxProcessor在其主循环中按优先级取出并执行。
- 每个
-
SubtaskCheckpointCoordinator:StreamTask包含一个SubtaskCheckpointCoordinator实例 (subtaskCheckpointCoordinator)。这个协调器负责处理 Task 级别的 Checkpoint 逻辑,例如触发操作符的快照、处理 Barrier 对齐、通知 Checkpoint 完成或中止等。- 很多 Checkpoint 相关的操作会首先由
SubtaskCheckpointCoordinator发起或处理,然后它可能会通过StreamTask的MailboxExecutor将具体的执行步骤提交到邮箱。
-
actionExecutor:StreamTask还有一个StreamTaskActionExecutor实例 (actionExecutor)。虽然MailboxExecutor用于将任务 放入 邮箱,但当Mail从邮箱中被取出后,其内部的Runnable通常会通过这个actionExecutor来实际执行。对于 Checkpoint 相关的操作,这确保了它们在正确的 Task 主线程上下文中运行。
发送 Checkpoint 相关邮件的典型场景和方法:
-
触发 Checkpoint (
triggerCheckpointAsync):- 当 JobManager 向 TaskManager 发送触发 Checkpoint 的 RPC 时,
Task(通常是StreamTask的父类或其本身)会调用triggerCheckpointAsync方法。 - 这个方法会将实际的 Checkpoint 执行逻辑封装成一个
Runnable,并通过mainMailboxExecutor(一个具有默认优先级的MailboxExecutor)提交到邮箱。 - 这样做是为了确保 Checkpoint 的所有阶段(例如调用操作符的
snapshotState)都在 Task 的主线程中执行,从而避免与正常的数据处理流程发生并发冲突。
StreamTask.java
// ... existing code ... @Override public CompletableFuture<Boolean> triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { checkForcedFullSnapshotSupport(checkpointOptions); CompletableFuture<Boolean> result = new CompletableFuture<>(); mainMailboxExecutor.execute( () -> { try { // Lock the mailbox to ensure that the checkpoint is not concurrent with other // actions synchronized (mailboxProcessor) { result.complete( triggerUnfinishedChannelsCheckpoint( checkpointMetaData, checkpointOptions)); } } catch (Exception ex) { // Report the failure both via the Future result but also to the mailbox result.completeExceptionally(ex); throw ex; } }, "checkpoint %s with %s", checkpointMetaData, checkpointOptions); return result; } // ... existing code ...在上面的代码片段中,
mainMailboxExecutor.execute(...)就是将 Checkpoint 触发逻辑(triggerUnfinishedChannelsCheckpoint)作为邮件发送到邮箱的关键步骤。 - 当 JobManager 向 TaskManager 发送触发 Checkpoint 的 RPC 时,
-
通知 Checkpoint 完成 (
notifyCheckpointCompleteAsync):- 当 Task 完成一个 Checkpoint 并收到 JobManager 的确认后,会调用此方法。
- 同样,通知操作符 Checkpoint 完成的逻辑也会被封装并通过
MailboxExecutor提交到邮箱。
StreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) { return notifyCheckpointOperation( () -> notifyCheckpointComplete(checkpointId), String.format("checkpoint %d completed", checkpointId)); } // ... existing code ...而
notifyCheckpointOperation内部会使用MailboxExecutor:StreamTask.java
// ... existing code ... private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) { CompletableFuture<Void> result = new CompletableFuture<>(); mailboxProcessor .getMailboxExecutor(TaskMailbox.MAX_PRIORITY) .execute( () -> { try { runnable.run(); } catch (Exception ex) { result.completeExceptionally(ex); throw ex; } result.complete(null); }, description); return result; } // ... existing code ...这里使用了
TaskMailbox.MAX_PRIORITY,表明这是一个高优先级的操作。 -
通知 Checkpoint 中止 (
notifyCheckpointAbortAsync):- 当一个 Checkpoint 因为各种原因(超时、错误、被新的 Checkpoint 取代)需要中止时,会调用此方法。
- 中止逻辑,包括清理操作符可能产生的临时状态,也会通过邮件发送到邮箱执行。
StreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointAbortAsync( long checkpointId, long latestCompletedCheckpointId) { return notifyCheckpointOperation( () -> { if (latestCompletedCheckpointId > 0) { notifyCheckpointComplete(latestCompletedCheckpointId); } if (isCurrentSyncSavepoint(checkpointId)) { throw new FlinkRuntimeException("Stop-with-savepoint failed."); } subtaskCheckpointCoordinator.notifyCheckpointAborted( checkpointId, operatorChain, this::isRunning); }, String.format("checkpoint %d aborted", checkpointId)); } // ... existing code ...同样,它也使用了
notifyCheckpointOperation方法,将中止逻辑放入邮箱。 -
处理 Barrier 对齐时的 Timer 回调:
- 当使用 Barrier 对齐策略时,如果一个 InputGate 等待某个 Channel 的 Barrier 超时,
SubtaskCheckpointCoordinator会注册一个 Timer。当这个 Timer 触发时,其回调逻辑(例如取消 Checkpoint 或强制触发 Checkpoint)也会被封装成邮件并通过MailboxExecutor提交到邮箱执行。 - 在
BarrierAlignmentUtil.createRegisterTimerCallback中可以看到相关的逻辑,它会返回一个BiConsumer<Long, Long>,这个 Consumer 内部会使用mainMailboxExecutor来执行超时处理。
- 当使用 Barrier 对齐策略时,如果一个 InputGate 等待某个 Channel 的 Barrier 超时,
-
Source Task 的特定行为:
- 例如在
SourceOperatorStreamTask中,notifyCheckpointAbortAsync和notifyCheckpointSubsumedAsync方法会直接使用mainMailboxExecutor来执行清理 Checkpoint 的逻辑。
SourceOperatorStreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointAbortAsync( long checkpointId, long latestCompletedCheckpointId) { mainMailboxExecutor.execute( () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId); return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId); } @Override public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) { mainMailboxExecutor.execute( () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId); return super.notifyCheckpointSubsumedAsync(checkpointId); // ... existing code ... - 例如在
总结:
StreamTask 依赖其 MailboxProcessor 和通过它获取的 MailboxExecutor 来确保所有与 Checkpoint 相关的关键操作(触发、通知完成/中止、Barrier 处理等)都在 Task 的主事件循环线程中串行执行。这避免了复杂的并发控制,保证了 Checkpoint 过程与正常数据处理流程的一致性和正确性。当需要执行一个 Checkpoint 相关操作时,通常会将其封装为一个 Runnable,然后通过 MailboxExecutor.execute() 方法将其作为一封邮件提交到邮箱队列中,等待 MailboxProcessor 的调度执行。
更多推荐



所有评论(0)