前言

最近开的坑有点多。有点忙不过来了所以好久没写Blog了。这个C++20的协程接入一直在改造计划中,但是一直没抽出时间来正式实施。 在之前,我写过一个初版的C++20协程接入 《libcopp接入C++20 Coroutine和一些过渡期的设计》 。当时主要是考虑到 Rust也有和C++类似的历史包袱问题,所以参考了一些Rust协程改造过程中的设计。 但是后来尝试在项目中使用的时候发现还是有一些问题。首先C++20的协程并不是零开销抽象,所以强行用Rust的模式反而带来了一定开销和理解上的难度。其次原先的设计中 generator 是按类型去实现外部接入的。但是实际接入SDK的过程中我们有相当一部分类型相同但是接入流程不同的情况,再加上现在各大编译器也都已经让C++20协程的特性脱离 experimental 阶段了,有一些细节有所变化。所以干脆根据我们实际的使用场景,重新设计了下组织结构。

我们整个服务器框架的和协程相关的改造分为两部分,第一部分是 libcopp 这个库的底层支持,另一个就是现有业务流程的改造过程。我先完成了 libcopp 对C++20协程适配支持,这也是本篇分享的主要内容。

C++20 Coroutine 伪代码

为了方便查阅和理解,还是先贴一下 C++20 Coroutine 基本原理的伪代码。

template<class... TARGS>
COROUTINE_OBJECT func(TARGS&&... args) {
  try {
    P p(std::forward<TARGS>(args)...);
    // 这个P是自己定义的 using P = COROUTINE_OBJECT::promise_type;
    // 文档上说promise_constructor_arguments 是空或者函数的参数的左值传入 args... ,但是目前版本的MSVC还仅支持空参数列表

    // ISO规范规定 get_return_object 在 initial_suspend 前,但是某些版本的Clang实现中 get_return_object 在 initial_suspend 后
    COROUTINE_OBJECT r = p.get_return_object();
    co_await p.initial_suspend();

    try {
      // 原函数体 ...
      p.return_void() or p.return_value(RET) // 取决于函数体里有没有 co_return RET
    } catch(...) {
      p.unhandled_exception();    // 未捕获的异常接口
    }

  final_suspend:
    co_await p.final_suspend();     // final suspend point

    return r;
  } catch(...) {
    return COROUTINE_OBJECT::promise_type::get_return_object_on_allocation_failure(); // noexcept
  }
}

使用场景分析

我们来看一下协程的使用场景。首先从使用者的角色上,主要是有3类。

  • 业务层: 由于有栈协程比如是侵入式的,那么业务调用的时候其实上下文关系是需要层层透传的。
  • 调度层: 在框架层,我们要能总控和调度资源,控制生命周期。
  • 接入层: 一般用于各类SDK接入、event loop接入或者其他事件机制接入。用来把各类平台服务或者功能库抽象成统一的协程模型。

我们在新的协程模型中,针对这三种场景设计了三个不同的工具类。在我们之前的工程实践中,采用 libcopp 的有栈协程也是有这些角色分工的,做一个简单的映射关系就是:

功能点cotask::task<T>C++20 coroutine附注
调度层 - 任务管理cotask::task<T>task_future<TRETURN, TPRIVATE>前者的私有数据分配在栈上,通过接口获取。后者的私有数据就是 TPRIVATE
接入层 - 延时操作task::resume(void*)generator_future<TRETURN>前者通过参数 void* 指向自定义类型,需要外部额外检查数据有效性,后者内置 set_value 接口,自动管理有效性
业务层 - 组合task::yield(void*)callable_future<TRETURN>前者通过参数 void* 指向自定义类型来接收数据,后者通过 co_await 的返回
返回值类型int自定义

C++20协程其实是对称协程的设计,即不区分主调和被调。虽然对称协程在逻辑耦合上比非对称协程更低,且开销更低。但是在实际应用场景中,还是非对称更实用一些,导致我们使用的时候很自然地就会封装成非对称的调用方式。试想这种场景:


callable_future<int> bar();

callable_future<int> foo() {
  // some codes
  co_await bar(); // IO operation
  // other codes
  co_return 0;
}

如果 bar() 是一个IO操作,比如保存数据到数据库,显然我们希望 foo()bar(); 执行结束后继续,这里就需要建立主调和被掉关系了。而实际上,C++20协程是可以做到 bar(); 执行结束前或者执行结束后过一段时间再回到 co_await bar(); 执行后面的代码的。

generator_future 只需要维护 awaitable,不需要创建协程对象,用于尽可能轻量级地允许接入层接入到协程体系中来。 而 callable_future<TRETURN>task_future<TRETURN, TPRIVATE> 的相似度比较高,都会管理协程的promise,那么我们为什么要把他们拆分成两个接口呢?这是因为 task_future 还附带了一些管理行为,并且主调和被掉直接没有直接联系。比如事件 A 可能触发一系列操作 B, C 等,并且不关心它们的结果,就可以用 task_future

对于 callable_future , 我们假定它的使用场景为协程的嵌套调用,为了方便业务层使用已有的协程或者 generator_future 组装出满足自己业务需要的协程流程。这个时候,我们可以假定大多数情况下它是会被一个caller co_await ,且上下文数据不会被转移。然后可以简化内部的数据结构,降低大多数使用场景下不必要的开销。

另外还有一种使用场景是在 调度层 我们需要控制协程任务的生命周期,如果它超时或者被 kill 了。都需要层层传递通知内部正在等待的协程强行终止,这样即便内部wait for ever了,我们仍然能够强制释放资源,而不会发生泄露。这里我们需要对协程的主调、被调关系进行管理。同时一个协程还可能被多个其他协程所 co_await ,resume行为可能由用户或者接入层触发,也可能由析构或者调度层管理的时候触发,所以我们还需要管理好生命周期。

C++20协程接入的设计模型

除了上述使用场景以外,不同编译器对C++20协程的实现也有些差异。虽然都是符合标准的,但是某些接口的调用先后顺序和析构(特别是临时对象)的先后顺序是有差异的。显然我们 libcopp 一个非常重要的要求就是抹平这些差异。以此为目的,我们设计的几个组件的主要流程和调用关系如下:

模块callable_future - 业务层task_future - 调度层(框架)generator_future - 接入层(扩展)
promise_typedata:waiting,callers,status,DATAdata:waiting,callers,status,context-
promise_type:unhandled_exception不捕获start/resume 接口后 rethrow-
promise_type:生命周期托管到 future关联到 context , 析构时取消关联-
promise_type:Execution首次被 co_await 时开始执行,未被 co_await 则视为放弃执行首次调用 start/resume 接口后开始执行-
promise_type:final_suspend所有 callers 调用 resume()所有 callers 调用 resume()-
context-data:task_id,handle,unhandle_exceptiondata:caller
context:析构/set_value-handle.promise().waiting.resume()caller.resume()
-handle.promise().resume()
-handle.promise().destroy()
futuredata:handledata:contextdata:context
future:析构handle.promise().resume()
handle.promise().destroy()
awaitabledata:caller,calleedata:caller,context(raw ptr)data:caller,context(raw ptr)
awaitable:析构promise_type:final_suspend 保证有效性,无需额外处理detach()detach()
awaitable:detachcallee.promise().remove_caller(caller)context->handle.promise().remove_caller(caller)context->remove_caller(caller)
caller.waiting = nullptrcaller.waiting = nullptrcontext->caller = nullptr
故障处理:状态传递( callee.status = caller.status )故障处理(exiting):依据 context->handle.promise().status
故障处理:依据 caller.status
awaitable:await_suspendcaller.promise().waiting = calleecaller.promise().waiting = calleethis->caller = caller
callee.promise().add_caller(caller)context->handle.promise().add_caller(caller)context->add_caller(caller)
awaitable:await_resumedetach()detach()detach()
callee.promise().waiting.resume()转移数据(返回 context->handle.promise().status)转移数据(ready/依据 caller.status )
转移数据(强制进入ready/依据 caller.status)

除了这些以外,和 libcopp 内的有栈协程一样,我们提供了一些特殊的错误流程, 超时取消Killed 。所以,如果 TRETURN 不是整数类型,我们就要求用户要提供 copp::promise_error_transform<TRETURN> 的特化,接受 promise_status 类型,返回 TRETURN 类型或者可以隐式转换到 TRETURN 的类型。

这几个组件其实是有一部分公共的部分的,对于这部分我们可以抽离出来,另外再结合前面提到的生命周期管理的一些要点,我简单总结了一下如下:

  • 公共逻辑
    • promise_error_transform<T>::operator()(promise_status) const noexcept : 故障转换 cancle/timeout/kill
    • promise_type 的公共数据
      • 正在等待的对象(用于传递 cancle/timeout/kill ): waiting:coroutine_handle<std_coroutine_promise_base>
      • 状态: status:created/running/done/cancle/kill/timeout
    • awaitable 的公共数据
      • 生命周期从属于调用方的 promise_type,且在 co_await 完成前被析构,此时被等待方的handle可能已经失效
        • 析构时: caller 有效,callee 可能无效
        • await_ready/await_suspend 时: caller 有效,callee/generator_context 有效

          注意:在多线程模式下,await_suspend 由发起 co_await 的线程在挂起协程后执行,如果这时候handle被传递到其他线程且被resume了。 这时候当前的awaitable对象会被析构,且 await_suspend 未执行完,这里需要额外的同步机制或者在 await_suspend 中传出handle后不再使用成员。 See Coroutines (C++20) - cppreference.com

        • await_resume 时: caller有效,callee/generator 可能无效
          • 通过callee在 final_suspend 时强制触发caller的resume,来保证callee此时有效
          • generator的 context 在析构时,也需要强制触发resume,来保证 context 此时有效
          • 但是第一次调用调用 resume() 后仍然可能导致 callee/generator_context 被释放,此时需要 await_suspend 配置
      • 调用者: caller:promise_base_type:handle_delegate
    • 通过 callable_promise/callable_context 链式传递关系
      • out of control: resume后需要检查状态,如果被kill,需要尽可能执行到结束
  • generator_future
    • 生命周期安全(调度层发起和恢复,内部调用)
      • context 析构前要把关联的 handle 直接 resume 掉(可能转为cancle状态)
      • 调用者被上层强制 resume 时,不能影响 generator_future 的生命周期。调用关系要自动解绑。
      • Wake重入保护+单元测试(主要是在各类主动和被动的kill,析构极其嵌套调用时的内存访问安全)
      • generator_context未被续期要自动被killed且释放(以防使用者有BUG导致资源泄露)
    • 数据完成接口: context:set_value
    • 事件转换(事件回调)
      • 挂起等待回调: await_suspend_callback(shared_ptr<generator_context>) -> 允许续期 context
      • 恢复回调: await_resume_callback(const generator_context&) -> context 可能正在析构,不允许续期
  • task_future
    • 生命周期安全(业务层发起,调度层发起,框架层管理,内部调用)
      • 单向引用关系:
        • task_futurecontext 强引用
        • promise_typecontext 弱引用,最后一个 task_future 析构时要强制kill掉
        • contextpromise_type 弱引用(通过 handle_delegate )
      • context 析构(最后一个 task_future 释放)前要把关联的 handle 一直 resumedone 为止
      • return_void()/return_value() 时不能 resume_callers() , 应该在 final_suspend() 中执行。否则会导致栈上部分对象倍重复析构。(GCC)
      • unbind_from_manager() 时是最后一个task也应该是内存安全的
    • then 接口,用于简化流程操作
    • 上层主动 kill
    • 任务管理: task_manager
      • 超时管理
  • callable_future
    • 可移动,不可复制(仅内部调用)
    • 管理handle生命周期(否则awaitable和promise的释放顺序是UB)
    • 传递失败状态
  • 特殊的callable_future
    • any( callable_future/generator_future/task_future )
    • all( callable_future/generator_future/task_future )
    • some( callable_future/generator_future/task_future )

设计模式适配

为了方便迁移和处理一些设计模式相关流程,我给 libcopp 对新的协程接入也加入了一些辅助性的功能。

task_manager

首先这个 task_manager 的主要功能和之前有栈协程的一样,仅包含超时管理、和基本的容器管理。 新的 task_manager 实现改成了特化实现,对老的有栈协程和新的 task_future 仅仅是task的部分类型不同。 这样逻辑能保持一致后面迁移起来方便一些。

task_future::then

then 接口也是原来有栈协程的 cotask::task 就有的功能,都是为了简化一些有先后顺序逻的辑关系的流程。 原来的有栈协程的 then 仅仅是复用了已有协程的栈分配器等一些设置(这样使用stack_pool的task,通过 then 创建的协程也是使用相同的 stack_pool )。 在新的C++20协程的 task_futurethen 中我对它做了更多的扩展。 首先,我们的 then 接口的返回值类型既支持返回 callable_future ,也支持返回 task_future 。两者的区别是前者更轻量级一些且不带私有数据,后者相反。那么很显然,我们可以用单参数的 then 来返回 callable_future, 用两个参数的 then 来返回 task_future 。其中第二个参数用于构造私有数据。可以用第二个参数传 nullptr 表示要创建的 task_future 的私有数据类型是 void 。 首先因为 then 接口里其实是需要上一步的返回值的,而返回类型是可以自定义的,并且可以是 void 。这里使用了特化来处理 void 这种特殊情况。 于是,我们就可以这么使用 then 接口。

cotask::task_future<int, void> t = task_func_await_int_simple();
auto f = t.then(
  [](task_future_int_type::context_pointer_type, task_future_int_type::value_type value) {
    std::cout << "The first thenable got "<< value <<" and return int" << std::endl;
    return value;
  },
  3000)
  .then(
      [](cotask::task_future<int, int>::context_pointer_type ctx, task_future_int_type::value_type value) {
        std::cout << "The second thenable got "<< value <<" and return void" << std::endl;
      },
      nullptr);
  .then(
      [](cotask::task_future<void, int>::context_pointer_type ctx) {
        std::cout << "The second thenable got void and return int again" << std::endl;
        return 0;
      },
      nullptr);

还有一种情况,有时候我们可能希望在 then 的回调里运行时返回一个新的协程,相当于运行时把某些流程插在静态的 thenable 调用链中间(类似Rust里的 Future 的回调里返回另一个 Future)。 举个例子:

cotask::task_future<int, void> t = task_func_await_int_simple();
auto f = t.then(
    [](task_future_int_type::context_pointer_type &&,
        task_future_int_type::value_type value) -> copp::callable_future<int> {
      std::cout << "The thenable return callable_future<int>" << std::endl;
      co_return value;
    },
    3000)
    .then(
        [](cotask::task_future<int, int>::context_pointer_type ctx, task_future_int_type::value_type value) {
          std::cout << "The second thenable await first thenable and the returned callable_future<int>" << std::endl;
          return value + ctx->get_private_data();
        },
        nullptr);

当然返回 task_future 也是可以的。

cotask::task_future<int, void> t = task_func_await_int_simple();
auto f = t.then(
  [](task_future_int_type::context_pointer_type,
      task_future_int_type::value_type value) -> cotask::task_future<int, void> {
    std::cout << "The first thenable return task_future<int>" << std::endl;
    co_return value;
  },
  3000)
  .then(
      [](cotask::task_future<int, int>::context_pointer_type ctx, task_future_int_type::value_type value) {
        std::cout << "The second thenable await first thenable and the returned task_future<int>" << std::endl;
        return value + ctx->get_private_data();
      },
      nullptr);

以上的实现方式同样也是针对传入callable object的返回值类型做了特化实现的,目前仅仅针对我的 libcopp 内部提供的类型做了特殊处理。检测callable object的返回值类型属于 type_traits 的范畴这里不再展开。当然理论上是可以对所有协程做特化处理的,但是检测是否符合所有协程要求的 promise_type 的 traits 实现起来会比较麻烦,所以暂未提供,以后有需要可能可以加吧。

some(), any()all()

在某些场景中,我们需要使用到 some(), any()all() 的功能。比如在批量保存场景中,我们可以起一批保存任务,然后 co_await all(tasks...); 。又或者在我们服务器的分布式事务系统中,实现了 Read Your Writes 一致性模型,在这个模型中假设有 N 个副本,只要读副本数 R + 写成功副本数 W > N 时,即可认为事务成功。那么我们发起 N 个写请求是不需要等待它们全部完成的,只需要等待 W 个成功就行了,这时候我们可以用 co_await some(W, tasks[N]);

实际上 any()all() 都是特殊形式的 some() ,前者相当于只要等待1个,后者则是需要等待全部。所以这两个的实现比较简单:

template <class TREADY_CONTAINER, class TWAITING_CONTAINER>
    LIBCOPP_COPP_API_HEAD_ONLY inline callable_future<promise_status> any(TREADY_CONTAINER&&ready_futures,
                                                                          TWAITING_CONTAINER&&pending_futures)
        requires SomeContainerConvertible<
          typename some_ready_container<TREADY_CONTAINER>::value_type,
          typename some_waiting_container_traits<TWAITING_CONTAINER>::value_type
        >
{
  return some_delegate<typename some_ready_container<TREADY_CONTAINER>::value_type>::template run(
      std::forward<TREADY_CONTAINER>(ready_futures), 1, &pending_futures);
}

template <class TREADY_CONTAINER, class TWAITING_CONTAINER>
    LIBCOPP_COPP_API_HEAD_ONLY inline callable_future<promise_status> all(TREADY_CONTAINER&&ready_futures,
                                                                          TWAITING_CONTAINER&&pending_futures)
        requires SomeContainerConvertible<
          typename some_ready_container<TREADY_CONTAINER>::value_type,
          typename some_waiting_container_traits<TWAITING_CONTAINER>::value_type
#  endif
{
  return some_delegate<typename some_ready_container<TREADY_CONTAINER>::value_type>::template run(
      std::forward<TREADY_CONTAINER>(ready_futures), gsl::size(pending_futures), &pending_futures);
}

而对于 some() , 最简单的方式就是在一个 while 循环中等待我们要关注的对象。主循环如下:

promise_type some_promise{&context};
while (context.status < promise_status::kDone) {
  // Killed by caller
  auto current_status = co_yield callable_future<promise_status>::yield_status();
  if (current_status >= promise_status::kDone) {
    context.status = current_status;
    break;
  }

  co_await some_promise;
}

// destroy promise object and detach handles

这时候我们还需要侵入 callable_future, generator_futuretask_future 的内部实现(访问一下私有成员)。因为对创建这些对象的地方并不知道这些 future 是否要被 some() 等待。对创建者而言,他们还是正常调用 set_value() 或者 kill() 之类。显然我们并不希望每次有唤醒都去检查一次哪些任务完成了,哪些仍然要等待,这不但会有惊群效应,还会有很多冗余的重置 caller 和重新设置 caller 的行为。那么我这里最终的做法就是加个计数器,当要等待的对象resume的次数到了我们要等待的对象数量了再扫描。

void await_resume() {
  // caller maybe null if the callable is already ready when co_await
  auto caller = get_caller();
  if (caller) {
    if (nullptr != caller.promise) {
      caller.promise->set_flag(promise_flag::kInternalWaitting, false);
    }
    set_caller(nullptr);
  }

  if (nullptr == context_) {
    return;
  }

  ++context_->scan_bound;
  if (context_->scan_bound >= context_->ready_bound) {
    scan_ready(*context_);
    context_->scan_bound = context_->ready.size();

    if (context_->scan_bound >= context_->ready_bound && context_->status < promise_status::kDone) {
      context_->status = promise_status::kDone;
    }
  }
}

最后,我贴一个单元测试的代码来暂时一下使用示例吧:

static copp::callable_future<int> callable_func_some_callable_in_initialize_list(size_t expect_ready_count,
                                                                                 copp::promise_status expect_status) {
  size_t resume_ready_count = 0;

  copp::callable_future<int> callable1 = callable_func_some_any_all_callable_suspend(471);
  copp::callable_future<int> callable2 = callable_func_some_any_all_callable_suspend(473);
  copp::callable_future<int> callable3 = callable_func_some_any_all_callable_suspend(477);

  copp::some_ready<copp::callable_future<int>>::type readys;
  // 这里是单独变量转span,直接用容器(比如vector)包callable也是可以的
  std::reference_wrapper<copp::callable_future<int>> pending[] = {callable1, callable2, callable3};
  auto some_result = co_await copp::some(readys, 2, copp::gsl::make_span(pending));
  CASE_EXPECT_EQ(static_cast<int>(expect_status), static_cast<int>(some_result));

  int result = 1;
  for (auto &ready_callable : readys) {
    if (ready_callable->is_ready()) {
      result += ready_callable->get_internal_promise().data();
      ++resume_ready_count;
    }
  }

  CASE_EXPECT_EQ(expect_ready_count, resume_ready_count);

  co_return result;
}

杂项优化

  • 小对象优化: 对于 generator_futuretask_future 的返回类型,我都采用了小对象优化。即,对于 trivally_copyable 且size小的类型,使用 memcpy 来传递 return_value 和外部传入的数据到 co_await 的返回值,否则使用 unique_ptr 来转移数据。而对于 callable_future,我希望更加精简,所有总是转移右值。
  • Module和 enable_if : 有些编译器已经支持 Module 了,我再编译时会尝试检测编译器特性,如果支持的话就使用 Module 来限定模板类型,对编译期的压力也会降低。不支持的话就fallback到 enable_if 的 type_traits。

Benchmark

压力测试仍然是分为trivial类型返回和非trivial类型返回两部分。非trivial类型因为有构造和析构函数,是会影响编译优化的效果的。

目前看起来 ,在高并发场景下 task 的创建CPU开销和原来有栈协程带栈池的开销降低了约 25%,切换开销降低了约 65%。低并发场景下切换开销降低了约 25%。如果复用 copp::generator_future , 切换的CPU开销还能再降低40%左右。其实这个CPU开销对于业务逻辑来说占比非常低了,C++20更大的优势在于内存管理。

https://en.cppreference.com/w/cpp/language/coroutines#Heap_allocation 的描述,某些场景下C++20协程是可以把promise的分配优化掉的。那么在高性能的接口层 ( callable_future ) 我们当然也想尽可能利用好编译器的特性。在 单元测试libcopp_sample_benchmark_std_couroutine_callable_recursive 相关的项目中。可以看到 create 时间很长,但是 resume 时间为0,这估计就是触发这个优化+尾调用优化了,直接创建协程的时候就把后面的跑完了。

接入过程中的易踩坑点

首先,在 《libcopp接入C++20 Coroutine和一些过渡期的设计》 里提到的 GCC 在MinGW环境下的链接符号问题 在当前的 GCC 12 中已经修复了, GCC 11 我没有测试。

第二个坑,是临时对象的析构顺序,如果 co_await 一个 future 对象,返回 awaitable 用于协程切出和恢复,在GCC 10的某些版本中,这个 future 对象析构会在 await_suspend 的之前完成( 10.3 以前的版本)。这里如果有资源引用可能主要注意一下生命周期。具体可以看一下 https://godbolt.org/z/dbrxns66b 这里的sample。

第三个易踩坑点是关于协程内 promise_type 类型的。由于带类型的 coroutine_handle<Promise> 有个接口 coroutine_handle<Promise>::promise() 可以获取内部的 promise 类型。并且有个接口 std::coroutine_handle<Promise>::from_promise 可以从某个 promise 构造handle。我们可能很自然地会想到可以用一个 promise 基类来实现一下公共的功能,然后模块直接传递基类的 coroutine_handle<Promise> 。但是这是行不通的,我们来看看 from_promise 的实现:

MSVC的实现:

static const size_t _ALIGNED_SIZE = is_empty_v<_PromiseT> ? 0 : ((sizeof(_PromiseT) + _ALIGN_REQ - 1) & ~(_ALIGN_REQ - 1));
static coroutine_handle from_promise(_PromiseT& _Prom) noexcept {
  auto _FramePtr = reinterpret_cast<char*>(_STD addressof(_Prom)) + _ALIGNED_SIZE;
  coroutine_handle<_PromiseT> _Result;
  _Result._Ptr = reinterpret_cast<_Resumable_frame_prefix*>(_FramePtr);
  return _Result;
}

GCC实现:

static coroutine_handle from_promise(_Promise& __p) {
  coroutine_handle __self;
  __self._M_fr_ptr = __builtin_coro_promise((char*) &__p, __alignof(_Promise), true);
  return __self;
}

可以看到,handle的地址和promise的对齐大小强相关,所以即便promise有继承关系,如果对齐长度不一致会导致互相转换之后地址错误。 简单地说,假设 struct PromiseA : public PromiseB , 有 coroutine_handle<PromiseB> b; ,不允许使用, coroutine_handle<PromiseA>::from_address(b.address())coroutine_handle<PromiseA>::from_promise(b.promise())

我们实现了一个 handle_delegate 来保存基类地址来解决这个问题。

第四个坑是在 return_value()/return_void() 里不能 handle.destroy() ,否则GCC的实现中会执行部分协程栈变量的析构函数。这里不同编译器地行为也不完全一样,但是这个应该本身就是UB。

有些时候我们会采用引用计数来记录资源,那么在协程执行结束后的 return_value()/return_void() 里有可能会引起引用解绑。如果这时候是最后一个引用被移除的时候可能会触发回收的机制,调用 handle.destroy() 来释放协程,这时候会就产生问题。在 libcopptask_future 中就有类似问题。 task_future 通过对 task_context 的引用计数来管理生命周期,如果这时候释放了最后一个 task_context 就会产生问题。所以我这里的做法是把这个数据的释放放在了 final_suspend() 中。

从有栈协程迁移

我们之前的有栈协程提供了一个带超时管理的 task_manager ,为了方便迁移。对于 task_future 我也提供了一个特化实现的 task_manager 提供相同的功能。当然,超时错误的错误返回不能像之前一样使用特定错误码。而是要走错误状态转换的 copp::promise_error_transform<TRETURN> ,并接受 kCancle/kKilled/kTimeout 作为参数。

其他的迁移出钱前文提到的 some(), any()all() 之外,都和项目中具体的使用方式与新老协程在接口形式上的约束有关了。我后面全部完成完整版本后会再专门开一篇分享在实际项目中各种应用场景下的的迁移过程。

压力测试

补充一个简单的压力测试: (带jemalloc 5.3.0)

  • CPU: AMD EPYC 7K62 48-Core Processor(2.6GHz)
  • Memory: 32GB
  • GCC Version: 12.2.0
  • Clang Version: 15.0.1
Chart 0

未来优化方向

现在唤醒caller和等待协程数据释放时通过 final_suspend() 返回一个 final_awaitable 里然后在 final_awaitable::await_suspend 里执行相应逻辑。或许可以把这些操作直接放在放在 final_suspend() 中,这样能简化协程的整个执行流。

未来可以还实现一个不带atomic非线程安全的 shared_ptr ,很多线程内部流程需要引用计数,但是其实不需要atomic操作。这会导致额外的CPU Cache失效。(特别是 task_futuregenerator_future 里 awaitable和promise对象之间交互时对 context 的传递,一定是在单线程内的。)

当前版本的 generator_future 开销也还比较大,最初我对回调函数都是采用复制的方式。由于 std::function 复制还是有一定开销的,但是构建 generator_futureco_await generator_future 又处于两个阶段,需要续期这个回调接口。所以我中途换了一种实现,加个了不带atomic的侵入式智能指针的 vtable 对象。因为构建 generator_futureco_await generator_future 基本再同线程。不过令人沮丧的是性能反而下降了。这一块后面也需要看看如何优化。

callable_future 的创建开销虽然基本低于原先的协程系统。但是再原先的协程中,函数嵌套调用并不需要层层创建协程。但是在 C++20 的无栈协程中,需要层层传递,层层创建。我预估整体开销应该是比之前使用有栈协程的方案高的。前面提到的编译器优化可以优化掉堆分配,但是协程本身的开销并没有省去。这个组件的接入是用于用户层传递协程任务状态的,传递层次会比较深,这里后续也可以想办法优化下。

最后

代码开源的主仓库是 https://github.com/owent/libcopp ,欢迎有兴趣的小伙伴交流分享。