从源码角度解析C++20新特性如何简化线程超时取消
为什么需要超时控制
超时控制是很常见的需求,最普遍的场景是为了防止程序卡住或者长时间占用资源,程序会主动取消掉一些超过允许运行时间的或者无响应的线程,比如一些耗时很长的网络连接处理线程等。当然用户等得不耐烦了手动点击取消任务执行也勉强可以算在内。
通常超时发生或者用户点击取消之后,我们都期待线程能迅速终止执行并让整个程序保持一个完整且安全的状态。然而现实是复杂的,想实现上述功能对于线程来说是一件难事,尤其在Linux系统上。
第一个难点是如何让线程知道自己要退出。对于进程来说这不是难点,因为不管进程在做什么,我们都可以靠向其发送信号来立即中断进程的执行(前提是线程没有屏蔽这个信号),这样进程的停止请求可以被立即感知到,进程从而可以尽快完成善后工作退出执行。同样的招数对多线程程序来说就没那么好用了——信号默认是发给整个进程的,为了能让每个线程独立地接收信号,我们需要保存线程的标识符并在每个线程中设置接收和屏蔽信号的mask,这大大增加了程序的复杂性;其次信号处理函数是整个进程内所有线程共享的,我们需要额外的手段来保证并发安全,同时还得兼顾信号处理函数需要可重入、快速执行的最佳实践,这会提高程序的开发难度。
第二个难点在于如何保证线程一定会退出执行。前面说到信号可以打断进程的执行,但这只是通知,实际上进程完全可以在信号处理函数返回后无视这个通知继续运行,或者有一种更普遍的场景——程序正好卡在某个系统调用上,而程序又设置了系统调用被信号中断后自动重启,这样即使我们有效通知了进程,进程也会在收完通知之后再次进入系统调用从而无法响应停止请求。所以作为保底手段,Linux可以发送SIGKILL这个信号强制终止进程,这个信号无法捕获也无法屏蔽,是我们货真价实的“底牌”。
上述的情况在多线程中同样存在,而且我们没有“底牌”可用——因为不管给哪个线程发送SIGKILL,都会杀死整个进程而不是单独接收到信号的那个线程。另外即使有办法强制终止线程(比如早期的JVM),我们还会遇到资源释放的问题。进程退出执行之后,内核会尽可能释放进程持有的所有资源,打开的文件会被关闭,缓冲区的内容会被刷新,文件锁之类的同步机制也会正常解锁;但线程并没有这种自动清理机制,清理工作完全需要手动执行,一旦进程没有释放自己持有的资源就退出,系统就会遇到各种数据损坏和死锁等并发问题,排查和修复会极其困难。
为了克服上述难点并安全高效地实现终止超时线程的执行,我们需要一些额外的控制手段。这也一直都是开发者中的热门话题。
在介绍C++20如何简化超时控制之前,我们先来看看前人的智慧成果。
Golang实现超时控制
Golang是天生支持并发的语言,这一点可谓名副其实,尤其是在超时控制上。
我们直接看个例子,例子里有主线程和工作线程,工作线程超时时间为5秒,如果超过这个时间还有线程没完成工作,就取消所有线程的执行。Golang里没有系统级的线程,但我们可以用goroutine模拟。
在工作线程中我们用sleep代替耗时的工作,这样便于测试:
func Work(ctx context.Context, id int) error { |
for range 10 { |
select { |
case <-ctx.Done(): |
fmt.Printf("worker %d: canceled\n", id) |
return ctx.Err() |
default: |
} |
if rand.IntN(2) == 0 { |
time.Sleep(500 * time.Millisecond) |
} else { |
time.Sleep(time.Second) |
} |
} |
fmt.Printf("worker %d: done\n", id) |
return nil |
} |
超时控制是ctx参数实现的,每次循环处理前我们都会主动检查线程是否需要退出,这种协作式的“请求-检查-响应”是各种语言中取消线程执行的常见做法。
这个工作函数执行时间在5秒到10秒之间,取值的步长在0.5秒,加上go标准库默认随机数是均匀分布的,所以整体执行时间的概率是正态分布的,在7.5秒左右我们很容易看到超时和正常运行结束两种情况。所以我们把超时时间分别设为4秒、7.5秒、11秒,来进行模拟运行实验:
func main() { |
// 从命令行获取超时时间,单位毫秒 |
timeout, err := strconv.Atoi(os.Args[1]) |
if err != nil { |
panic(err) |
} |
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond) |
now := time.Now() |
defer cancel() |
g := &errgroup.Group{} |
for i := range 3 { |
g.Go(func() error { |
return Work(ctx, i) |
}) |
} |
err = g.Wait() |
fmt.Printf("run time: %s\n", time.Since(now)) |
if err != nil { |
if errors.Is(err, context.DeadlineExceeded) { |
fmt.Println("Tasks canceled") |
return |
} |
panic(err) |
} |
fmt.Println("All work done!") |
} |
代码很简单,关键在这行:ctx, cancel := context.WithTimeout(context.Background(), 7500*time.Millisecond),只要我们设定的时间到了,<-ctx.Done()就会从阻塞变为非阻塞,循环开始处的检查会发现这个变化,然后会退出线程的执行。代码中使用了errgroup,但这不是必须的,实际上有很多办法可以通知主线程,这里我选择了一种最通用的,代价是代码会稍微复杂一些。
运行代码,会看到下面这样的输出,结果有很大的随机成分,下面只是无数种可能中的一种:
$ go build -o test |
$ ./test 4000 |
worker 1: canceled |
worker 0: canceled |
worker 2: canceled |
run time: 4.00431275s |
Tasks canceled |
$ ./test 7500 |
worker 0: done |
worker 2: done |
worker 1: canceled |
run time: 7.507776458s |
Tasks canceled |
$ ./test 11000 |
worker 1: done |
worker 2: done |
worker 0: done |
run time: 8.509193125s |
All work done! |
可以看到超时控制发挥了作用,尽管内置的time计时有一些误差,但程序的总体的运行时间是小于等于超时时间的。
Golang的超时控制可以通过context简单实现,但需要工作线程主动检查主动配合,前文我们也提到了强制终止工作线程很可能会造成并发问题,因此所有的线程超时控制中都是采用的这种协作式退出机制,即使天生并发的语言也不能免俗。作为代价,我们需要谨慎编码以免工作线程无法响应退出请求,同时还需要付出一点在循环里检查是否需要退出执行的性能损失。
C++中的典型超时控制实现
c++没有方便好用的context,想要实现协作式退出得自己造轮子。
Golang好用是因为标准库和运行时调度器隐藏了实现的细节:WithTimeout实际上会创建一个定时器,到时间后调度器会执行定时器的回调函数主动关闭ctx内部的channel,这样<-ctx.Done()就会从阻塞变成非阻塞,协程就能检查到这一变化从而退出执行。
核心只在于两点,以合适的方法标记线程已被取消和异步地在超时后设置取消标记。
第一点很容易解决,使用原子变量即可。第二点的异步通知有些棘手,但我们还是有几种选择:
- 使用alarm和信号:
alarm会注册一个定时器,到时间后给进程发送SIGALRM信号,虽说多线程程序里不推荐用信号,但在这个场景下在信号处理函数里设置原子变量是合适的,另外使用alarm(0)可以取消之前注册的定时器。 - 使用多线程:我们可以另外创建一个线程,并在其中等待到超时时间过去之后设置标志,这样主线程也不会阻塞。
当然两个方案各有缺点:
alarm是整个进程共享的,且同时只能设置一个定时器,最后它只能设置秒级精度的超时时间;使用setitimer可以解决上面这些问题,但会出现不知道信号是哪个超时的定时器发送的问题。- 多线程方案问题比较少,集中在变量生命周期和任务正常完成如何取消超时控制线程这两点上。
综合来看使用多线程方案才能真正解决问题,跨平台性也更强。知道原理后我们就可以写实验代码了。
转换后的工作函数是这样的:
namespace { |
std::atomic<int> canceled_flag{0}; |
std::atomic<int> is_canceled{0}; |
} |
void Work(int id) |
{ |
std::mt19937 rng{std::random_device{}()}; |
std::uniform_int_distribution<int> dist(0, 1); |
for (int i = 0; i < 10; ++i) { |
if (canceled_flag.load(std::memory_order_acquire) == 1) { |
std::osyncstream{std::cout} << "worker: " << id << " canceled\n"; |
is_canceled.store(1, std::memory_order_release); |
return; |
} |
if (dist(rng) == 0) { |
std::this_thread::sleep_for(std::chrono::milliseconds(500)); |
} else { |
std::this_thread::sleep_for(std::chrono::seconds(1)); |
} |
} |
std::osyncstream{std::cout} << "worker: " << id << " done\n"; |
} |
代码和go版本的没有太大差异,唯一的区别是我们不靠返回值而是is_canceled标志来区分线程是否因为被取消而退出。只使用canceled_flag会导致竞态条件并导致误判,你可以想想是为什么,这算是课后练习。另外我在这还使用了memory_order,这不是必须的,但默认的cst内存序多少有些杀鸡用牛刀。
下面是主线程和超时控制线程的逻辑:
int main(int argc, const char *argv[]) |
{ |
if (argc != 2) { |
std::cerr << "wrong arg\n"; |
return 1; |
} |
auto timeout = std::stoi(argv[1]); |
std::vector<std::thread> workers; |
constexpr int worker_num = 3; |
workers.reserve(worker_num); |
for (int i = 0; i < worker_num; ++i) { |
workers.emplace_back(work, i); |
} |
std::atomic<int> timeout_cancel_flag{0}; |
std::thread{ |
// 超时控制线程 |
[&timeout_cancel_flag](auto timeout){ |
std::this_thread::sleep_for(timeout); |
if (timeout_cancel_flag.load(std::memory_order_acquire) == 1) { // 危险 |
return; |
} |
canceled_flag.store(1, std::memory_order_release); |
}, std::chrono::milliseconds(timeout) |
}.detach(); |
for (auto &worker: workers) { |
worker.join(); |
} |
timeout_cancel_flag.store(1, std::memory_order_release); |
if (is_canceled.load(std::memory_order_acquire) != 1) { |
std::osyncstream{std::cout} << "All works done!\n"; |
} else { |
std::osyncstream{std::cout} << "Tasks canceled\n"; |
} |
} |
整体上没什么难懂的地方,基本可以看做Golang版本的转译,如果线程都退出之后不管是否超时我们都要取消超时控制线程。整体上只有一点不一样:超时控制线程是detach的,因为我们不能让主线程阻塞。
然而这段代码有很致命的生命周期问题,想象一下如果worker都在timeout之前完成工作,且函数在timeout之前退出,但超时控制线程仍然需要睡眠到timeout为止,这时候它醒来访问到的timeout_cancel_flag将会是一个无效值。
问题出在两个地方,第一个是我们用了sleep,这不可中断,线程必须要等满timeout时间才能退出,这会造成线程泄漏;第二是因为sleep不可中断,导致我们的超时控制线程生命周期长于主线程和工作线程,在其中引用的主线程的局部变量很可能会失效。
解决方案当然也很多,最简单的就是用std::shared_ptr包裹我们需要跨线程访问的资源,这和在rust中使用Arc是一样的。但这种方案治标不治本,我们的超时控制线程仍然会有比其他线程长的生命周期。
第二种则是使用一种在超时等待中可以被中断的机制,c++20前我们有std::timed_mutex和条件变量可用。
改进后的代码:
auto timeout_cancel_flag = std::make_shared<std::condition_variable>(); |
std::thread{ |
[timeout_cancel_flag](auto timeout){ |
std::mutex timeout_lock; |
std::unique_lock u{timeout_lock}; |
if (timeout_cancel_flag->wait_for(u, timeout) == std::cv_status::timeout) { |
std::osyncstream{std::cout} << "cancel all threads\n"; |
canceled_flag.store(1, std::memory_order_release); |
} else { |
std::osyncstream{std::cout} << "self was canceled by main thread\n"; |
} |
}, std::chrono::milliseconds(timeout) |
}.detach(); |
for (auto &worker: workers) { |
worker.join(); |
} |
timeout_cancel_flag->notify_all(); // 取消超时控制线程 |
我们可以使用条件变量的wait_for方法,它可以让当前线程阻塞到指定的时间,或者中途被notify唤醒。这完美实现了我们既要超时等待又要中途可被打断的需求。并且条件变量本身用std::shared_ptr包裹,不会有任何生命周期问题。
然而没有了生命周期问题,我们还有时序问题,如果主线程中的notify_all()早于控制线程中的wait_for执行(概率比较小但不为0),那么这次notify超时控制线程是收不到的,wait会一直阻塞到超过timeout,这时候再设置取消标志就没有意义了。想要解决这种“唤醒丢失”问题,我们需要借助wait重载的第三个参数,让它告诉我们超时控制线程本身是否被取消:
struct TimeoutContext { |
std::atomic<int> canceled{0}; |
std::mutex lock; |
std::condition_variable cv; |
}; |
auto timeout_ctx = std::make_shared<TimeoutContext>(); |
std::thread{ |
[timeout_ctx](auto timeout){ |
// 必须使用ctx里的锁才能有效避免竞态条件 |
std::unique_lock u{timeout_ctx->lock}; |
if (!timeout_cancel_flag->wait_for(u, timeout, [&](){ return timeout_ctx->canceled.load(std::memory_order_acquire) == 1; })) { |
// wait_for 返回 false,canceled是值还是0,说明是超时导致的返回 |
std::osyncstream{std::cout} << "cancel all threads\n"; |
canceled_flag.store(1, std::memory_order_release); |
} else { |
// wait_for 返回 true,canceled被设置为1,说明主线程通知了取消 |
std::osyncstream{std::cout} << "self was canceled by main thread\n"; |
} |
}, std::chrono::milliseconds(timeout) |
}.detach(); |
for (auto &worker: workers) { |
worker.join(); |
} |
// 取消超时控制线程 |
{ |
// 获取同一把锁,修改状态时要么超时控制线程还没运行,要么已经在wait了 |
std::lock_guard lk(timeout_ctx->lock); |
timeout_ctx->canceled.store(1, std::memory_order_release); |
} |
// 解锁后才能通知 |
timeout_ctx->cv.notify_all(); |
因为有锁存在,所以不管怎么样运行顺序只有两种:
- 超时线程先运行,一直到wait方法里解锁,我们可以保证wait一定在notify之前运行
- 主线程设置超时线程取消标志的代码先运行,这时wait是晚于notify执行的,但我们设置取消标志是先于wait的,而wait在休眠前会先检查谓词条件,所以条件变量会马上退出不会进行等待。
- 会不会存在wait中条件变量解除了锁,在即将进入休眠前主线程完成了执行?答案是不会的,标准有明文要求wait和它的兄弟函数里unlock+wait加在一起是原子的(实际上分为三部分,解锁+休眠、被唤醒、重新加锁,它们各自都是原子的)且和notify之间是全序关系——要么notify在前他们在后或者反过来,不可能同时执行。简单说,如果超时控制线程正在执行unlock+wait,这说明主线程没有拿到锁,此时主线程要么还没运行到notify(这种情况不会丢失唤醒),要么已经设置了标志并释放了锁,谓词会检测到标志被设置(谓词检测在锁的保护中),条件变量不进入休眠;如果notify在之后运行,则notify会看到超时控制线程已经进入wait,会唤醒它。所以不存在中间可以被打断的场景。
更多推荐
所有评论(0)