背景
这篇分享拖更了好久了。问题起源于去年我们项目组接入 opentelemetry-cpp 的时候,在进程优雅退出的时候偶现超时,虽然可以直接kill进程没啥影响但是退出不“优雅”的话总归会破坏发布流程,增加人工介入的成本。这里记录一下问题可能其他的组件有类似的用法也会有相似的问题。
相关代码和使用场景
首先介绍下代码逻辑的背景,在代码结构层面。首先是有一个后台线程处理Batch Process和导出任务。然后caller线程发起 ForceFlush
或者 Shutdown
的时候要等待已经发出的事件执行完毕。
他们之间通过 std::condition_variable
来通知事件完成。
大致代码如下,首先是 bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
接口:
bool BatchSpanProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
// ...
// Now wait for the worker thread to signal back from the Export method
std::unique_lock<std::mutex> lk_cv(synchronization_data_->force_flush_cv_m);
synchronization_data_->is_force_flush_pending.store(true, std::memory_order_release);
auto break_condition = [this]() {
if (synchronization_data_->is_shutdown.load() == true)
{
return true;
}
// Wake up the worker thread once.
if (synchronization_data_->is_force_flush_pending.load(std::memory_order_acquire))
{
synchronization_data_->is_force_wakeup_background_worker.store(true,
std::memory_order_release);
synchronization_data_->cv.notify_one();
}
return synchronization_data_->is_force_flush_notified.load(std::memory_order_acquire);
};
// Fix timeout to meet requirement of wait_for
timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout(
timeout, std::chrono::microseconds::zero());
bool result;
if (timeout <= std::chrono::microseconds::zero())
{
bool wait_result = false;
while (!wait_result)
{
// When is_force_flush_notified.store(true) and force_flush_cv.notify_all() is called
// between is_force_flush_pending.load() and force_flush_cv.wait(). We must not wait
// for ever
wait_result = synchronization_data_->force_flush_cv.wait_for(lk_cv, schedule_delay_millis_,
break_condition);
}
result = true;
}
else
{
result = synchronization_data_->force_flush_cv.wait_for(lk_cv, timeout, break_condition);
}
// ...
return result;
}
然后 void BatchSpanProcessor::DoBackgroundWork()
和相关代码:
void BatchSpanProcessor::DoBackgroundWork()
{
auto timeout = schedule_delay_millis_;
while (true)
{
// Wait for `timeout` milliseconds
std::unique_lock<std::mutex> lk(synchronization_data_->cv_m);
synchronization_data_->cv.wait_for(lk, timeout, [this] {
if (synchronization_data_->is_force_wakeup_background_worker.load(std::memory_order_acquire))
{
return true;
}
return !buffer_.empty();
});
synchronization_data_->is_force_wakeup_background_worker.store(false,
std::memory_order_release);
if (synchronization_data_->is_shutdown.load() == true)
{
DrainQueue();
return;
}
auto start = std::chrono::steady_clock::now();
Export();
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
// Subtract the duration of this export call from the next `timeout`.
timeout = schedule_delay_millis_ - duration;
}
}
void BatchSpanProcessor::Export()
{
do
{
// 省略无关代码 ...
exporter_->Export(nostd::span<std::unique_ptr<Recordable>>(spans_arr.data(), spans_arr.size()));
NotifyCompletion(notify_force_flush, synchronization_data_);
} while (true);
}
void BatchSpanProcessor::NotifyCompletion(
bool notify_force_flush,
const std::shared_ptr<SynchronizationData> &synchronization_data)
{
if (!synchronization_data)
{
return;
}
if (notify_force_flush)
{
synchronization_data->is_force_flush_notified.store(true, std::memory_order_release);
synchronization_data->force_flush_cv.notify_one();
}
}
简单描述一下就是在退出阶段,首先要设置shutdown flag,然后唤醒后台线程导出数据。之后用一个 std::condition_variable
来等待后台线程通知导出完成。然后自己再退出,时序大概如下:
sequenceDiagram
Caller->>Caller: 采集当前待上报边界
Background->>Background: 启动
Caller->>Background: 通知唤醒后台线程(condition_variable-1)
Caller->>Caller: 进入等待上报完成通知(condition_variable-2)
loop 循环执行到退出
Background->>Background: 导出一批数据
Background->>Caller: 通知已经导出的数据边界(condition_variable-2)
Background->>Background: 没有数据则进入等待(condition_variable-1)
end
loop 直到当前上报边界已经全部上报完成或超时
Caller->>Caller: 等待上报完成唤醒(condition_variable-2)
Caller->>Caller: 检查上报边界或超时
end
Caller->>Caller: 退出
Background->>Background: 后台线程退出
问题分析
上面的流程和代码乍一看似乎没有什么问题,但是在实际项目中偶尔会出现在 BatchSpanProcessor::ForceFlush
里的 synchronization_data_->force_flush_cv.wait_for
陷入了长耗时的等待。有的版本甚至会陷入无尽得等待。
因为在退出流程里,上层要保证最后的数据执行过刷出。并且后台线程不再引用资源之后才能销毁内存对象,所以会ForceFlush
一次,且 schedule_delay_millis_
一般情况下考虑网络抖动会设置得比较长。某些流程里也有可能会在reload的时候切换到另一个新线程上调用 ForceFlush(max())
的,这时候如果发生退出也会发生某些Provider一直退不掉的问题。
在后台任务刷出数据完成后,设置完 is_force_flush_notified
并且 force_flush_cv.notify_one()
后下一此循环会发现 is_shutdown
已经是 true
了,就会退出后台线程,之后就不再有 force_flush_cv.notify_one()
了。
那么在调用 ForceFlush()
的线程里 force_flush_cv.wait_for(lk_cv, schedule_delay_millis_, break_condition);
的 break_condition
包含 is_force_flush_notified.load(std::memory_order_acquire);
为什么返回 false
且后面还没收到 force_flush_cv.notify_one()
的通知呢?
我们先来看一看 std::condition_variable::wait_for
的实现。(几个主流STL库实现大同小异,这里贴Linux下GCC的实现作为案例)
template<typename _Rep, typename _Period, typename _Predicate>
bool
wait_for(unique_lock<mutex>& __lock,
const chrono::duration<_Rep, _Period>& __rtime,
_Predicate __p)
{
using __dur = typename steady_clock::duration;
return wait_until(__lock,
steady_clock::now() +
chrono::__detail::ceil<__dur>(__rtime),
std::move(__p));
}
template<typename _Clock, typename _Duration, typename _Predicate>
bool
wait_until(unique_lock<mutex>& __lock,
const chrono::time_point<_Clock, _Duration>& __atime,
_Predicate __p)
{
while (!__p())
if (wait_until(__lock, __atime) == cv_status::timeout)
return __p();
return true;
}
// 其他的 wait_until 嵌套都是设计模式相关,不在展示,最后是调用下面的代码,pthread的接口
void
wait_until(mutex& __m, clockid_t __clock, timespec& __abs_time)
{
pthread_cond_clockwait(&_M_cond, __m.native_handle(), __clock,
&__abs_time);
}
可以看到,这里关键点在这个代码:
while (!__p())
if (wait_until(__lock, __atime) == cv_status::timeout)
return __p();
return true;
可能会存在一个极小的临界区,在调用 __p()
的时候 is_force_flush_notified.store(true, std::memory_order_release)
和 force_flush_cv.notify_one()
还没执行到,然后准备进入 wait_until(__lock, __atime)
, 但是在最终调用 pthread_cond_clockwait
前 is_force_flush_notified.store(true, std::memory_order_release)
和 force_flush_cv.notify_one()
执行掉了。然后再进入wait就没收到 notify ,并且由于退出阶段,后台线程已经退出不会再收到新的notify,所以必须等待到超时。如果这时候超时时间很长就会陷入一个非常长时间的等待。
解决方案
找到问题之后解决方法就比较简单了。主要有两种,一种是再加一个 mutex 锁,把 is_force_flush_notified
, force_flush_cv
和 std::condition_variable::wait_for
也放进这个锁的临界区里保护,这样会多一个锁,多一些额外开销。另一种方案则是缩短等待时间后重试,缺点是发生这个情况的时候可能要多等个这个等待时间后才能完成。
因为这个毕竟是在退出阶段才出现且很小概率出现,所以我选了后一种不带来额外开销的方案。PR去年已经合入,小伙伴门可以放心使用。
后续问题
后面其实还有线程安全的问题也和这个相关,虽然我们项目里没用到,但是社区收到了issue。因为上面的 is_force_flush_notified
是个atomic的bool值,在多线程调用 ForceFlush()
的时候有概率某个线程的 is_force_flush_notified
会被其他线程吞掉。相关详情见: [SDK] BatchSpanProcessor::ForceFlush appears to be hanging forever 和 [SDK] BatchLogRecordProcessor::ForceFlush hangs for 10 seconds 。
现在的解决法方法是不再使用bool值判定多次调用的先后关系,而是使用exported序号来进行。PR见: [SDK] Fix forceflush may wait for ever。
欢迎有兴趣的小伙伴互相交流研究。