背景

设计 《libcopp对C++20协程的接入和接口设计》 的时候,由于C++20协程的promise和awaitable是链式关联的。所以当时设计promise和awaitable之间通过一个共享的context来通信交互。当时第一版实现直接使用了 std::shared_ptr 来管理共享引用,也预留了个规划是未来可以改成非线程安全的引用来减少不必要的Cache Miss开销。

大部分业务场景中,单单atomic的开销占比非常低。但是在密集计算的场景中,反复atomic操作带来的Cache Miss不仅仅影响自己,还影响后续其他的代码操作,考虑这些影响以后,总体而言开销就不能忽略不计了。 在计算密集的场景中,经验上通常是调度层处理线程安全问题,然后分发到worker的时候尽可能不要设计线程同步,这样比每个接口去保证线程安全有更好的新能。

在之前 《实现strong_rc_ptr(比shared_ptr更快的引用计数智能指针)》 的分享中,我给我们项目组的交易行模块实现了通用的不使用 atomic 的 std::shared_ptr 替代品(strong_rc_ptrweak_rc_ptr)。在附带大量业务逻辑代码的实际业务场景的压测Case中,仅仅是替换索引的指针,最高可以提升16%的性能。

同时虽然 libcopp 的有栈协程部分可以有选项关闭掉部分atomic操作。但是栈池管理的部分仍然用的 std::shared_ptr ,关闭得并不彻底。

另一方面,之前在给框架层实现RPC框架的时候,有些时候需要其他同学实现新的接入层。虽然之前实现了 rpc::custom_resumerpc::custom_wait 来方便接入。但是这个缺乏严格的类型检查,而且需要组合使用,仍然会对一些同学造成困惑。现在golang的channel模型很多同学都比较习惯了。所以也想做一个类似 go channel 模型的抽象。方便使用和理解,也能在上面实施一些静态检查。

方案和实现

针对以上问题,这次的调整主要就是改动这两块。一个是 channel 模型组件,另一个是把 strong_rc_ptrweak_rc_ptr 移植入 libcopp 并且应用到所有数据管理层。

简化编译选项

libcopp 之前用 LIBCOPP_DISABLE_ATOMIC_LOCK, LIBCOPP_LOCK_DISABLE_THIS_MTLIBCOPP_LOCK_DISABLE_MT 分别控制原子操作是否用atomic,是否 this_coroutine 和 this_task 使用atomic防止冲突和是否关闭 task_manager 的加锁。这次再加入栈管理和C++20协程共享context管理层面的选项的话太过于复杂了。所以现在改成 LIBCOPP_ENABLE_MULTI_THREAD 统一管控是否启用线程安全。

C++20协程的 channel 模型实现

channel模型主要是提供一个receiver给调用房使用,还有一个sender给接入方使用。其实和之前 copp::generator_future + copp::generator_context 的能力是一样的。 但是当时为了最佳适应性,提供了一个vtable组件来做事件通知,结构大概这样:

template <class TCONTEXT>
class LIBCOPP_COPP_API_HEAD_ONLY generator_vtable {
 public:
  using context_type = TCONTEXT;
  using context_pointer_type = LIBCOPP_COPP_NAMESPACE_ID::memory::default_strong_rc_ptr<context_type>;
  using value_type = typename context_type::value_type;
  using await_suspend_callback_type = std::function<void(context_pointer_type)>;
  using await_resume_callback_type = std::function<void(const context_type&)>;

  // ...
 private:
  LIBCOPP_UTIL_INTRUSIVE_PTR_ATOMIC_TYPE intrusive_ref_counter_;
  await_suspend_callback_type await_suspend_callback_;
  await_resume_callback_type await_resume_callback_;
}

可以看到,除了引用计数外,回调函数使用的是 std::function , 它还是有一定开销的。所以当时有个计划是实现一个使用更轻量级的裸指针和一个不包含任何事件回调通知的版本。 对于channel功能而言,其实就是不需要任何事件回调的版本。

为了复用之前实现的 copp::somecopp::anycopp::all 这类设计模式的抽象,这次的优化通过对 copp::generator_future 特化实现来进行。

enum class generator_vtable_type : uint8_t {
  kDefault = 0,
  kLightWeight = 1,
  kNone = 2,
};

// 对外接口
template <class TVALUE, class TERROR_TRANSFORM = promise_error_transform<TVALUE>,
          generator_vtable_type VTABLE_TYPE = generator_vtable_type::kDefault>
class generator_future;

template <class TPROMISE, bool RETURN_VOID, generator_vtable_type VTABLE_TYPE>
class generator_awaitable;

template <class TVALUE, class TERROR_TRANSFORM = promise_error_transform<TVALUE>>
using generator_lightweight_future = generator_future<TVALUE, TERROR_TRANSFORM, generator_vtable_type::kLightWeight>;

template <class TVALUE, class TERROR_TRANSFORM = promise_error_transform<TVALUE>>
using generator_channel_future = generator_future<TVALUE, TERROR_TRANSFORM, generator_vtable_type::kNone>;

// ============ 特化回调类型 ============
template <class TCONTEXT>
struct generator_vtable_type_trait<TCONTEXT, generator_vtable_type::kDefault> {
  using context_type = TCONTEXT;
  using context_pointer_type = LIBCOPP_COPP_NAMESPACE_ID::memory::default_strong_rc_ptr<context_type>;
  using value_type = typename context_type::value_type;
  using await_suspend_callback_type = std::function<void(context_pointer_type)>;
  using await_resume_callback_type = std::function<void(const context_type&)>;
};

template <class TCONTEXT>
struct generator_vtable_type_trait<TCONTEXT, generator_vtable_type::kLightWeight> {
  using context_type = TCONTEXT;
  using context_pointer_type = LIBCOPP_COPP_NAMESPACE_ID::memory::default_strong_rc_ptr<context_type>;
  using value_type = typename context_type::value_type;
  using await_suspend_callback_type = void (*)(context_pointer_type);
  using await_resume_callback_type = void (*)(const context_type&);
};

// ============ 特化vtable容器代理 ============
template <class TCONTEXT>
class LIBCOPP_COPP_API_HEAD_ONLY generator_vtable_delegate<TCONTEXT, generator_vtable_type::kLightWeight> {
 public:
  using context_type = TCONTEXT;
  using vtable_type = generator_vtable<context_type, generator_vtable_type::kLightWeight>;
  using context_pointer_type = LIBCOPP_COPP_NAMESPACE_ID::memory::default_strong_rc_ptr<context_type>;
  using value_type = typename context_type::value_type;
  using await_suspend_callback_type = typename vtable_type::await_suspend_callback_type;
  using await_resume_callback_type = typename vtable_type::await_resume_callback_type;

  template <class TSUSPEND, class TRESUME>
  LIBCOPP_UTIL_FORCEINLINE generator_vtable_delegate(TSUSPEND&& await_suspend_callback,
                                                     TRESUME&& await_resume_callback) noexcept
      : vtable_(new vtable_type(std::forward<TSUSPEND>(await_suspend_callback),
                                std::forward<TRESUME>(await_resume_callback))) {}

  template <class TSUSPEND>
  LIBCOPP_UTIL_FORCEINLINE generator_vtable_delegate(TSUSPEND&& await_suspend_callback) noexcept
      : vtable_(new vtable_type(std::forward<TSUSPEND>(await_suspend_callback), nullptr)) {}

  LIBCOPP_UTIL_FORCEINLINE generator_vtable_delegate() noexcept : vtable_(nullptr) {}

  LIBCOPP_UTIL_FORCEINLINE generator_vtable_delegate(const generator_vtable_delegate& other) noexcept
      : vtable_(other.vtable_) {}

  LIBCOPP_UTIL_FORCEINLINE void trigger_suspend_callback(context_type& context) {
    if (vtable_ && vtable_->get_await_suspend_callback()) {
      vtable_->get_await_suspend_callback()(context.shared_from_this());
    }
  }

  LIBCOPP_UTIL_FORCEINLINE void trigger_resume_callback(const context_type& context) {
    if (vtable_ && vtable_->get_await_resume_callback()) {
      vtable_->get_await_resume_callback()(context);
    }
  }

 private:
  copp::memory::intrusive_ptr<vtable_type> vtable_;
};

template <class TCONTEXT>
class generator_vtable_delegate<TCONTEXT, generator_vtable_type::kNone> {
 public:
  using context_type = TCONTEXT;
  using context_pointer_type = LIBCOPP_COPP_NAMESPACE_ID::memory::default_strong_rc_ptr<context_type>;
  using value_type = typename context_type::value_type;

  generator_vtable_delegate() noexcept {}
  generator_vtable_delegate(const generator_vtable_delegate&) noexcept {}

  void trigger_suspend_callback(context_type& /*context*/) noexcept {}

  void trigger_resume_callback(const context_type& /*context*/) noexcept {}
};

最后提供一组类似 Rust的 std::sync::mpsc::channel 来创建channel就行了。

template <class TVALUE, class TERROR_TRANSFORM = promise_error_transform<TVALUE>>
using generator_channel_receiver = generator_channel_future<TVALUE, TERROR_TRANSFORM>;

template <class TVALUE, class TERROR_TRANSFORM = promise_error_transform<TVALUE>>
using generator_channel_sender = typename generator_channel_receiver<TVALUE, TERROR_TRANSFORM>::context_pointer_type;

template <class TVALUE, class TERROR_TRANSFORM = promise_error_transform<TVALUE>>
inline std::pair<generator_channel_receiver<TVALUE, TERROR_TRANSFORM>,
                 generator_channel_sender<TVALUE, TERROR_TRANSFORM>>
make_channel() {
  generator_channel_receiver<TVALUE, TERROR_TRANSFORM> future;
  generator_channel_sender<TVALUE, TERROR_TRANSFORM> context = future.get_context();
  return std::make_pair(std::move(future), std::move(context));
}

stackful协程的 channel 模型实现

stackful模式下之前没有现成的实现,为了保持使用上统一,所以需要新实现一个。 原理和结构也类似,也是receiver和sender通过共享的context来交互。

classDiagram
    stackful_channel_context <|-- stackful_channel_receiver
    stackful_channel_context <|-- stackful_channel_sender
    stackful_channel_context: -future data_
    stackful_channel_context: -variant callers_
    stackful_channel_context: +bool is_ready()
    stackful_channel_context: +bool is_pending()
    stackful_channel_context: +void reset_value()
    stackful_channel_context: +const value_type *get_value() const
    stackful_channel_context: +value_type *get_value()
    stackful_channel_context: +void add_caller(handle_delegate handle)
    stackful_channel_context: +bool remove_caller(handle_delegate handle)
    stackful_channel_context: +size_t resume_callers()
    stackful_channel_context: +bool has_multiple_callers() const
    class stackful_channel_receiver{
      -context_pointer_type context_;
      +bool is_ready()
      +bool is_pending()
      +void reset_value()
      +const value_type *get_value() const
      +value_type *get_value()
      +value_type inject_await(TCONTEXT *ctx, TERROR_TRANSFORM &&error_transform)
      +const context_pointer_type &get_context() const
      +context_pointer_type &get_context()
    }
    class stackful_channel_sender{
      -context_pointer_type context_;
      +bool is_ready()
      +bool is_pending()
      +void reset_value()
      +void set_value(U &&in)
      +const context_pointer_type &get_context() const
      +context_pointer_type &get_context()
    }

和C++20协程的 copp::generator_future 实现一样。这里也会对小的trivial类型执行 inplace 构造优化,所以后面的压力测试会区分 trivial 和 非trivial 两种值类型。 这里的caller也是记录无栈协程的上下文即可,然后增加了 inject_await 接口和stackful的协程和task对象交互。除此之外还有两处差异。

一是错误码转换是在调用 inject_await 的时候,也就是在执行等待的时候传入。这是为了类型数量可以尽可能少一点。

template <class TERROR_TRANSFORM>
value_type inject_await(coroutine_context *ctx, TERROR_TRANSFORM &&error_transform) noexcept(
    std::is_nothrow_copy_constructible<value_type>::value && noexcept(error_transform(COPP_EC_ARGS_ERROR))) {
  return internal_inject_await<coroutine_context>(ctx, std::forward<TERROR_TRANSFORM>(error_transform));
}

另一处是这里要stackful协程和task分别去适配注入。

// coroutine_context
template <class TAWAITABLE, class TERROR_TRANSFORM,
            class = nostd::enable_if_t<stackful_inject_awaitable<nostd::remove_cvref_t<TAWAITABLE>>::value>>
inline container_value_type<TAWAITABLE> await_value(TAWAITABLE &&awaitable, TERROR_TRANSFORM &&error_transform) noexcept(
      std::is_nothrow_copy_constructible<container_value_type<TAWAITABLE>>::value && noexcept(error_transform(COPP_EC_ARGS_ERROR))) {
  return awaitable.inject_await(this, std::forward<TERROR_TRANSFORM>(error_transform));
}

// task inject
LIBCOPP_COPP_NAMESPACE_BEGIN
template <class TCO_MACRO>
struct stackful_channel_resume_handle<LIBCOPP_COTASK_NAMESPACE_ID::task<TCO_MACRO>> {
  inline static int resume(void *invoke_task, stackful_channel_context_base *priv_data) {
    if (nullptr != invoke_task) {
      return reinterpret_cast<LIBCOPP_COTASK_NAMESPACE_ID::task<TCO_MACRO> *>(invoke_task)
          ->resume(reinterpret_cast<void *>(priv_data));
    }

    return 0;
  }
};
LIBCOPP_COPP_NAMESPACE_END

这是因为 cotask::task<T> 在执行resume后要做一些状态机切换的操作和触发完成事件,如果不单独注入无法及时触发完成事件和状态机切换。

最后在提供构造receiver和sender的接口就好了。

template <class TVALUE>
inline std::pair<stackful_channel_receiver<TVALUE>, stackful_channel_sender<TVALUE>> make_stackful_channel() {
  stackful_channel_receiver<TVALUE> receiver;
  stackful_channel_sender<TVALUE> sender{receiver.get_context()};
  return std::make_pair(std::move(receiver), std::move(sender));
}

strong_rc_ptrweak_rc_ptr

智能指针的实现和 《实现strong_rc_ptr(比shared_ptr更快的引用计数智能指针)》 里一样,没啥特殊的。仅仅是多加了 default_rc_ptr_trait 来处理默认用带还是不带atomic的版本。

#if LIBCOPP_MACRO_ENABLE_MULTI_THREAD
using default_rc_ptr_trait = compat_strong_ptr_function_trait<compat_strong_ptr_mode::kStl>;
#else
using default_rc_ptr_trait = compat_strong_ptr_function_trait<compat_strong_ptr_mode::kStrongRc>;
#endif

template <class T>
using default_strong_rc_ptr = typename default_rc_ptr_trait::template shared_ptr<T>;

template <class T>
using default_weak_rc_ptr = typename default_rc_ptr_trait::template weak_ptr<T>;

template <class T>
using default_enable_shared_from_this = typename default_rc_ptr_trait::template enable_shared_from_this<T>;

template <class T, class... ArgsT>
LIBCOPP_UTIL_FORCEINLINE default_strong_rc_ptr<T> default_make_strong(ArgsT&&... args) {
  return default_rc_ptr_trait::template make_shared<T>(std::forward<ArgsT>(args)...);
}

template <class T, class Alloc, class... TArgs>
LIBCOPP_UTIL_FORCEINLINE default_strong_rc_ptr<T> default_allocate_strong(const Alloc& alloc, TArgs&&... args) {
  return default_rc_ptr_trait::template allocate_shared<T>(alloc, std::forward<TArgs>(args)...);
}

template <class T, class F>
LIBCOPP_UTIL_FORCEINLINE default_strong_rc_ptr<T> default_static_pointer_cast(F&& f) {
  return default_rc_ptr_trait::template static_pointer_cast<T>(std::forward<F>(f));
}

template <class T, class F>
LIBCOPP_UTIL_FORCEINLINE default_strong_rc_ptr<T> default_const_pointer_cast(F&& f) {
  return default_rc_ptr_trait::template const_pointer_cast<T>(std::forward<F>(f));
}

#if defined(LIBCOPP_MACRO_ENABLE_RTTI) && LIBCOPP_MACRO_ENABLE_RTTI
template <class T, class F>
LIBCOPP_UTIL_FORCEINLINE default_strong_rc_ptr<T> default_dynamic_pointer_cast(F&& f) {
  return default_rc_ptr_trait::template dynamic_pointer_cast<T>(std::forward<F>(f));
}
#endif

具体实现见: https://github.com/owent/libcopp/tree/v2/include/libcopp/utils/memory

压力测试对比和分析

下面测试分几组:

  • 低并发:基准 :同时运行协程数低,编译优化开 -O2
  • 低并发:仅命中率 :同时运行协程数低,编译优化开 -O2 ,关闭Atomic,去除task_manager锁
  • 低并发:Release :同时运行协程数低,编译优化开 -O3 ,关闭Atomic,去除task_manager锁
  • 高并发:基准 :同时运行协程数高,模拟随机访问,编译优化开 -O2
  • 高并发:仅命中率 :同时运行协程数高,模拟随机访问,编译优化开 -O2 ,关闭Atomic,去除task_manager锁
  • 高并发:Release :同时运行协程数高,模拟随机访问,编译优化开 -O3 ,关闭Atomic,去除task_manager锁
对比项低并发:基准低并发:仅命中率低并发:Release高并发:基准高并发:仅命中率高并发:Release
Stackful协程上下文(cotask::coroutine_context*)创建(动态栈池)---66ns41ns(-38%)38ns(-43%)
Stackful协程上下文(cotask::coroutine_context*)切换47ns32ns(-32%)25ns(-47%)83ns54ns(-35%)42ns(-49%)
Stackful协程任务(cotask::task)创建(动态栈池)---102ns61ns(-40%)64ns(-37%)
Stackful协程任务(cotask::task)切换65ns42ns(-35%)34ns(-48%)95ns78ns(-18%)68ns(-28%)
C++20协程任务(cotask::task_future)创建(Trivial值类型)---98ns78ns(-21%)77ns(-22%)
C++20协程任务(cotask::task_future)创建(非Trivial值类型)---109ns91ns(-16%)93ns(-15%)
C++20协程任务(cotask::generator_future)切换(Trivial值类型)46ns32ns(-31%)28ns(-40%)49ns28ns(-43%)31ns(-37%)
C++20协程任务(cotask::generator_future)切换(非Trivial值类型)52ns38ns(-27%)39ns(-25%)56ns41ns(-27%)44ns(-22%)
C++20协程Callable(cotask::callable_future)创建(Trivial值类型)---57ns53ns(-7%)51ns(-3%)
C++20协程Callable(cotask::callable_future)创建(非Trivial值类型)---55ns55ns(-0%)52ns(-11%)
C++20协程Callable(cotask::generator_future)切换(Trivial值类型)34ns23ns(-32%)20ns(-41%)35ns27ns(-22%)21ns(-40%)
C++20协程Callable(cotask::generator_future)切换(非Trivial值类型)47ns41ns(-13%)34ns(-28%)49ns43ns(-12%)38ns(-22%)
C++20协程轻量级Generator(cotask::generator_future,Trivial值类型)47ns29ns(-38%)23ns(-51%)52ns36ns(-31%)33ns(-36%)
C++20协程轻量级Generator(cotask::generator_future,非Trivial值类型)54ns44ns(-22%)37ns(-31%)60ns49ns(-17%)46ns(-23%)
C++20协程Channel(cotask::generator_future,Trivial值类型)34ns23ns(-32%)20ns(-41%)38ns26ns(-32%)26ns(-32%)
C++20协程Channel(cotask::generator_future,非Trivial值类型)47ns39ns(-17%)33ns(-30%)48ns42ns(-13%)39ns(-19%)

为什么上面的基准使用动态栈池的版本?这是因为我们实际项目里使用的是根据负载动态伸缩的栈池。 实际开销包含涉及模式带来的管理开销,比如一些简单地DAG能力,wait_all/wait_some/wait_anythen 接口,任务管理器注入等等。 所以这个组合和测试参考参考数据更贴近业务实际使用场景,而不是单纯为了“跑分”。

  • C++20协程ChannelC++20协程轻量级Generator 的Case都使用 C++20协程Callable 进行,数据都是触发协程切换的开销。
  • 低并发下创建开销误差大,无意义所以忽略数据。
  • 上面的切换开销都是一来+一回两次。在实际RPC使用场景中,一次RPC调用涉及IO写的时候切出和拿到回包后切入。这样更符合实际使用场景。
    • 有些对称式协程一般只测一次切换,要对比的话可以*2后对比。
  • C++20协程的 cotask::generator_future 组件本身创建开销大约是40ns左右。实际使用的时候可以池化复用,所以这里没有列举出 cotask::generator_future 的创建开销。
  • C++20协程涉及对分配,这个压测使用的是默认的内存分配器,没有使用 jemalloc/mimalloc/tcmalloc 等,有兴趣需要测试的同学可以自己加载PRELOAD跑。

来个图更直观一点。首先是 低并发场景

Chart 0

然后是 高并发场景

Chart 1

总体来看,无论是高并发场景还是低并发场景,关闭atomic操作都能带来大约20%-30%的性能提升,提升量相当可观。 而进一步开启 -O3 编译后,大部分场景可以进一步提升,特别是对非Trivial类型比较明显。少量场景可能是过度inline反而导致Code Cache命中率下降从而开销轻微增加。

最新测试也可以查看 https://github.com/owent/libcopp/actions 里的CI输出。

最后

当前版本的优化已经发布到 https://github.com/owent/libcopp/releases/tag/v2.3.1 。未来可能可以针对小对象和对缓存命中率进一步优化。 也欢迎有兴趣的小伙伴们一起交流。