背景

这篇分享拖更了好久了。问题起源于去年我们项目组接入 opentelemetry-cpp 的时候,在进程优雅退出的时候偶现超时,虽然可以直接kill进程没啥影响但是退出不“优雅”的话总归会破坏发布流程,增加人工介入的成本。这里记录一下问题可能其他的组件有类似的用法也会有相似的问题。

相关代码和使用场景

首先介绍下代码逻辑的背景,在代码结构层面。首先是有一个后台线程处理Batch Process和导出任务。然后caller线程发起 ForceFlush 或者 Shutdown 的时候要等待已经发出的事件执行完毕。 他们之间通过 std::condition_variable 来通知事件完成。

大致代码如下,首先是 bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept 接口:

bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
  // ...

  // Now wait for the worker thread to signal back from the Export method
  std::unique_lock<std::mutex> lk_cv(synchronization_data_->force_flush_cv_m);

  synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release);
  auto break_condition = [this]() {
    if (synchronization_data_->is_shutdown.load() == true)
    {
      return true;
    }

    // Wake up the worker thread once.
    if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire))
    {
      synchronization_data_->is_force_wakeup_background_worker.store(true,
                                                                     std::memory_order_release);
      synchronization_data_->cv.notify_one();
    }

    return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire);
  };

  // Fix timeout to meet requirement of wait_for
  timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
      timeout, std::chrono::microseconds::zero());
  bool result;
  if (timeout <= std::chrono::microseconds::zero())
  {
    bool wait_result = false;
    while (!wait_result)
    {
      // When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
      // between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
      // for ever
      wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_,
                                                                   break_condition);
    }
    result = true;
  }
  else
  {
    result = synchronization_data_->force_flush_cv.wait_for(lk_cv, timeout, break_condition);
  }

  // ...
  return result;
}

然后 void BatchSpanProcessor::DoBackgroundWork() 和相关代码:

void BatchSpanProcessor::DoBackgroundWork()
{
  auto timeout = schedule_delay_millis_;

  while (true)
  {
    // Wait for `timeout` milliseconds
    std::unique_lock<std::mutex> lk(synchronization_data_->cv_m);
    synchronization_data_->cv.wait_for(lk, timeout, [this] {
      if (synchronization_data_->is_force_wakeup_background_worker.load(std::memory_order_acquire))
      {
        return true;
      }

      return !buffer_.empty();
    });
    synchronization_data_->is_force_wakeup_background_worker.store(false,
                                                                   std::memory_order_release);

    if (synchronization_data_->is_shutdown.load() == true)
    {
      DrainQueue();
      return;
    }

    auto start = std::chrono::steady_clock::now();
    Export();
    auto end      = std::chrono::steady_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

    // Subtract the duration of this export call from the next `timeout`.
    timeout = schedule_delay_millis_ - duration;
  }
}

void BatchSpanProcessor::Export()
{
  do
  {
    // 省略无关代码 ...
    exporter_->Export(nostd::span<std::unique_ptr<Recordable>>(spans_arr.data(), spans_arr.size()));
    NotifyCompletion(notify_force_flush, synchronization_data_);
  } while (true);
}

void BatchSpanProcessor::NotifyCompletion(
    bool notify_force_flush,
    const std::shared_ptr<SynchronizationData> &synchronization_data)
{
  if (!synchronization_data)
  {
    return;
  }

  if (notify_force_flush)
  {
    synchronization_data->is_force_flush_notified.store(true, std::memory_order_release);
    synchronization_data->force_flush_cv.notify_one();
  }
}

简单描述一下就是在退出阶段,首先要设置shutdown flag,然后唤醒后台线程导出数据。之后用一个 std::condition_variable 来等待后台线程通知导出完成。然后自己再退出,时序大概如下:

sequenceDiagram
    Caller->>Caller: 采集当前待上报边界
    Background->>Background: 启动
    Caller->>Background: 通知唤醒后台线程(condition_variable-1)
    Caller->>Caller: 进入等待上报完成通知(condition_variable-2)
    loop 循环执行到退出
    Background->>Background: 导出一批数据
    Background->>Caller: 通知已经导出的数据边界(condition_variable-2)
    Background->>Background: 没有数据则进入等待(condition_variable-1)
    end
    loop 直到当前上报边界已经全部上报完成或超时
      Caller->>Caller: 等待上报完成唤醒(condition_variable-2)
      Caller->>Caller: 检查上报边界或超时
    end
    Caller->>Caller: 退出
    Background->>Background: 后台线程退出

问题分析

上面的流程和代码乍一看似乎没有什么问题,但是在实际项目中偶尔会出现在 BatchSpanProcessor::ForceFlush 里的 synchronization_data_->force_flush_cv.wait_for 陷入了长耗时的等待。有的版本甚至会陷入无尽得等待。

因为在退出流程里,上层要保证最后的数据执行过刷出。并且后台线程不再引用资源之后才能销毁内存对象,所以会ForceFlush 一次,且 schedule_delay_millis_ 一般情况下考虑网络抖动会设置得比较长。某些流程里也有可能会在reload的时候切换到另一个新线程上调用 ForceFlush(max()) 的,这时候如果发生退出也会发生某些Provider一直退不掉的问题。

在后台任务刷出数据完成后,设置完 is_force_flush_notified 并且 force_flush_cv.notify_one() 后下一此循环会发现 is_shutdown 已经是 true 了,就会退出后台线程,之后就不再有 force_flush_cv.notify_one() 了。

那么在调用 ForceFlush() 的线程里 force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, break_condition);break_condition 包含 is_force_flush_notified.load(std::memory_order_acquire); 为什么返回 false 且后面还没收到 force_flush_cv.notify_one() 的通知呢?

我们先来看一看 std::condition_variable::wait_for 的实现。(几个主流STL库实现大同小异,这里贴Linux下GCC的实现作为案例)

template<typename _Rep, typename _Period, typename _Predicate>
  bool
  wait_for(unique_lock<mutex>& __lock,
      const chrono::duration<_Rep, _Period>& __rtime,
      _Predicate __p)
  {
using __dur = typename steady_clock::duration;
return wait_until(__lock,
    steady_clock::now() +
    chrono::__detail::ceil<__dur>(__rtime),
    std::move(__p));
  }

template<typename _Clock, typename _Duration, typename _Predicate>
  bool
  wait_until(unique_lock<mutex>& __lock,
  const chrono::time_point<_Clock, _Duration>& __atime,
  _Predicate __p)
  {
while (!__p())
if (wait_until(__lock, __atime) == cv_status::timeout)
  return __p();
return true;
  }

  // 其他的 wait_until 嵌套都是设计模式相关,不在展示,最后是调用下面的代码,pthread的接口
  void
  wait_until(mutex& __m, clockid_t __clock, timespec& __abs_time)
  {
    pthread_cond_clockwait(&_M_cond, __m.native_handle(), __clock,
          &__abs_time);
  }

可以看到,这里关键点在这个代码:

while (!__p())
if (wait_until(__lock, __atime) == cv_status::timeout)
  return __p();
return true;

可能会存在一个极小的临界区,在调用 __p() 的时候 is_force_flush_notified.store(true, std::memory_order_release)force_flush_cv.notify_one() 还没执行到,然后准备进入 wait_until(__lock, __atime), 但是在最终调用 pthread_cond_clockwaitis_force_flush_notified.store(true, std::memory_order_release)force_flush_cv.notify_one() 执行掉了。然后再进入wait就没收到 notify ,并且由于退出阶段,后台线程已经退出不会再收到新的notify,所以必须等待到超时。如果这时候超时时间很长就会陷入一个非常长时间的等待。

解决方案

找到问题之后解决方法就比较简单了。主要有两种,一种是再加一个 mutex 锁,把 is_force_flush_notifiedforce_flush_cvstd::condition_variable::wait_for 也放进这个锁的临界区里保护,这样会多一个锁,多一些额外开销。另一种方案则是缩短等待时间后重试,缺点是发生这个情况的时候可能要多等个这个等待时间后才能完成。

因为这个毕竟是在退出阶段才出现且很小概率出现,所以我选了后一种不带来额外开销的方案。PR去年已经合入,小伙伴门可以放心使用。

后续问题

后面其实还有线程安全的问题也和这个相关,虽然我们项目里没用到,但是社区收到了issue。因为上面的 is_force_flush_notified 是个atomic的bool值,在多线程调用 ForceFlush() 的时候有概率某个线程的 is_force_flush_notified 会被其他线程吞掉。相关详情见: [SDK] BatchSpanProcessor::ForceFlush appears to be hanging forever[SDK] BatchLogRecordProcessor::ForceFlush hangs for 10 seconds 。 现在的解决法方法是不再使用bool值判定多次调用的先后关系,而是使用exported序号来进行。PR见: [SDK] Fix forceflush may wait for ever

欢迎有兴趣的小伙伴互相交流研究。