前言

protobuf 从3.0版本开始对C++增加了Arena接口,可以用于使用连续的内存块分配内部对象,并且可以更容易精确地控制对象地生命周期,最终达到减少内存碎片地目的。最近我给我们项目的部分接口流程进行相关地改造,在大多数使用 protobuf 的地方都增加了对Arena的支持,但是在接入过程中也碰到了一些问题和坑。

Arena的基本原理

Arena的原理十分简单,就是预先分配一个内存块。创建Message和内部对象的时候全部在分配好的内存块上 placement new 出来,所有的Message对象也会内部记录所属的Arena以便创建字对象和某些情况下需要检查Arena时使用。如果创建的对象不支持Arena的,在 placement new 完成后要在Arena上设置一个析构回调,以便在释放的时候调用析构流程。如果Arena内部的内存块剩余内存不足则会自动创建下一个(可能是更大的)内存块。

Arena可以在创建Arena的时候通过指定自定义的ArenaOptions来设置一些系数,包括最大内存块大小(如果超出了会直接用)、初始内存块大小、分配/回收内存块的实现、事件接口等。

每次Arena内存块剩余内存不足时,会尝试分配 最后一个内存块size*2ArenaOptions设置里的最大内存块 中的最小值(即: min(2 * last_block.size, ArenaOptions.max_block_size) )。如果要分配的内存大小本身就是大于 ArenaOptions设置里的最大内存块 的,则会直接分配需要的内存块的大小+Header的大小(当前版本Header的大小是三个指针长度对齐到8,64位系统下就是24字节)。

长期存在对象的生命周期

Arena有一个特点是它维护的 所有对象都是在Arena析构的时候统一释放 的。这中间它内部维护的内存块只会不断地append,并不会删除。所以这也决定了由Arena维护的对象要么只能是临时对象,要么是不可变的。否则它的内存会无限增长下去。比如,我们是有状态服务器,如果我们把一个用户的数据块长期缓存在内存里,然后Arena和用户对象的生命绑定。那么中间很多操作会不断地变更内部的对象结构,这就会导致用户下线前Arena无限增长。

所以,我们主要对Arena的集成最终集中在各个Task的入口处,然后一个Task里的子Task和RPC请求中需要创建的局部变量数据都复用这个Arena。当一个Task及其子Task全部结束以后,Arena就释放了。而除非少量的一些对全服数据操作的Task以外,大多数Task生命周期也就几秒钟,内存的回收时间就相对可控。

初始化分配的大小和最大分配的大小

ArenaOptions设置 里,默认的初始分配大小是 256B ,最大分配大小是 8KB 。前面也提到,我们的集成主要在各个Task的入口处,在Task里光是链路跟踪和RPC header相关的数据就占了100-200字节,而实际使用中一个Task的请求包、应答包就2倍曹处256B了。所以我们把初始值提升到了512B。同时我们项目中战斗记录的包都偏大,然后一些玩家数据拉取的包体也比较大,所以最大值也提高到了64KB。当然这些值后面有待观察,我们后面出了更详细的统计之后可能也再会调整。

直接迁移 set_allocated_XXX/release_XXX 可能导致内存泄漏

protobuf 里,经常会碰上一些类似消息转发或者复用某些Message的操作,如果这些Message比较大,Copy的话显然是比较浪费的。所以有些地方会使用 set_allocated_XXXrelease_XXX 接口来复用某些Message。 比如在我们的项目里,保存数据到DB的时候经常会有这种操作:

// 参数传入 user_basic_profile;
table_message_type container;
container.set_id(user_id);
container.set_allocated_basic_profile(&user_basic_profile); // 直接复用已有的数据结构,用于后续打包
// ... 其他类似赋值代码
int result = pack_and_send(TcaplusService::TCAPLUS_API_UPDATE_REQ, container); // 打包和RPC
container.release_basic_profile(&user_basic_profile);       // 释放生命周期管理
// ... 其他类似释放代码

但是加入了Arena之后就不一样了。我们 不能 简单地把代码改成这样 :

// 参数传入 arena, user_basic_profile;
table_message_type* container = ::google::protobuf::Arena::CreateMessage<table_message_type>(arena);
container->set_id(user_id);
container->set_allocated_basic_profile(&user_basic_profile); // 直接复用已有的数据结构,用于后续打包
// ... 其他类似赋值代码
int result = pack_and_send(TcaplusService::TCAPLUS_API_UPDATE_REQ, container); // 打包和RPC
container->release_basic_profile(&user_basic_profile);       // 释放生命周期管理
// ... 其他类似释放代码
// ...

为什么呢?我们来看看生成的 set_allocated_basic_profilerelease_basic_profile

// ============ set_allocated_basic_profile ============
void table_message_type::set_allocated_basic_profile(user_basic_profile_t* basic_profile) {
  ::PROTOBUF_NAMESPACE_ID::Arena* message_arena = GetArena();
  if (message_arena == nullptr) {
    delete reinterpret_cast< ::PROTOBUF_NAMESPACE_ID::MessageLite*>(basic_profile_);
  }
  if (basic_profile) {
    ::PROTOBUF_NAMESPACE_ID::Arena* submessage_arena =
      reinterpret_cast<::PROTOBUF_NAMESPACE_ID::MessageLite*>(basic_profile)->GetArena();
    if (message_arena != submessage_arena) {
      basic_profile = ::PROTOBUF_NAMESPACE_ID::internal::GetOwnedMessage(
          message_arena, basic_profile, submessage_arena);
    }

  } else {

  }
  basic_profile_ = basic_profile;
  // @@protoc_insertion_point(field_set_allocated:tdr2pb.TABLE_USER_DEF.basic_profile)
}

// ------------ set_allocated_basic_profile 相关接口实现: GetOwnedMessage  ------------
template <typename T>
T* GetOwnedMessage(Arena* message_arena, T* submessage,
                   Arena* submessage_arena) {
  // The casts must be reinterpret_cast<> because T might be a forward-declared
  // type that the compiler doesn't know is related to MessageLite.
  return reinterpret_cast<T*>(GetOwnedMessageInternal(
      message_arena, reinterpret_cast<MessageLite*>(submessage),
      submessage_arena));
}

// ------------ set_allocated_basic_profile 相关接口实现: GetOwnedMessage  ------------
MessageLite* GetOwnedMessageInternal(Arena* message_arena,
                                     MessageLite* submessage,
                                     Arena* submessage_arena) {
  GOOGLE_DCHECK(submessage->GetArena() == submessage_arena);
  GOOGLE_DCHECK(message_arena != submessage_arena);
  if (message_arena != NULL && submessage_arena == NULL) {
    message_arena->Own(submessage);                    // 堆上的message直接转移进arena
    return submessage;
  } else {
    MessageLite* ret = submessage->New(message_arena); // 如果message_arena非空,则是复制了一个对象并放在message_arena上,否则堆上复制了一个对象。并不影响原message的生命周期
    ret->CheckTypeAndMergeFrom(*submessage);
    return ret;
  }
}

// ============ release_basic_profile ============
inline user_basic_profile_t* table_message_type::release_basic_profile() {
  user_basic_profile_t* temp = basic_profile_;
  basic_profile_ = nullptr;
  if (GetArena() != nullptr) {
    temp = ::PROTOBUF_NAMESPACE_ID::internal::DuplicateIfNonNull(temp);
  }
  return temp;
}

重要的注解都在上面标注了。回到我们之前的例子,对于底层接口而言,我们不能假设传入的 user_basic_profile 是在哪里分配的。实际上对于上面例子里的数据库保存操作,大多数情况下 user_basic_profile 来自于user对象的内存缓存。前面也提及了,这部分数据是在堆上的,那么对于这种情况,前面改Arena的例子里实际的流程就会变成这样:

// 参数传入 arena, user_basic_profile;
table_message_type* container = ::google::protobuf::Arena::CreateMessage<table_message_type>(arena);
container->set_id(user_id);
container->set_allocated_basic_profile(&user_basic_profile); // 只要user_basic_profile不在arena上,那么这里就复制了一份数据
// ... 其他类似赋值代码
int result = pack_and_send(TcaplusService::TCAPLUS_API_UPDATE_REQ, container); // 打包和RPC
container->release_basic_profile(&user_basic_profile);       // 这里则是复制了一份user_basic_profile,因为返回值被忽略了,这里就内存泄露了。
// ... 其他类似释放代码
// ...

开启Arena之后,实际上增加了两个函数 unsafe_arena_set_allocated_XXXunsafe_arena_release_XXX 。我们能不能直接用这个代替掉 set_allocated_XXXrelease_XXX 呢?我们继续来看它生成的代码:

// ============ unsafe_arena_set_allocated_basic_profile ============
inline void TABLE_USER_DEF::unsafe_arena_set_allocated_basic_profile(
    ::mvp::table_user_basic_profile* basic_profile) {
  if (GetArena() == nullptr) {
    delete reinterpret_cast<::PROTOBUF_NAMESPACE_ID::MessageLite*>(basic_profile_); // 注意这里父Message如果Arena是空直接调用了delete子成员,这里没有判断子成员是否是在某个Arena里的。
  }
  basic_profile_ = basic_profile;
  if (basic_profile) {

  } else {

  }
  // @@protoc_insertion_point(field_unsafe_arena_set_allocated:tdr2pb.TABLE_USER_DEF.basic_profile)
}

// ============ unsafe_arena_release_basic_profile ============
inline user_basic_profile_t* TABLE_USER_DEF::unsafe_arena_release_basic_profile() {
  // @@protoc_insertion_point(field_release:tdr2pb.TABLE_USER_DEF.basic_profile)
  
  user_basic_profile_t* temp = basic_profile_;
  basic_profile_ = nullptr;
  return temp;
}

使用的时候 如果父级Message如果是Arena分配的,只要使用者能保证调用 unsafe_arena_set_allocated_XXX 时成员为空,那么这里是可以直接代替的。但是在实际调用流程复杂了以后仍然怕这部分不小心误用,一旦误用带来的后果也很严重并且很难排查。所以我们项目中是 仅仅代码生成器会使用这个接口,人工调用是禁止的 。最终的变更形式如下:

// 参数传入 arena, user_basic_profile;
table_message_type* container = ::google::protobuf::Arena::CreateMessage<table_message_type>(arena); // 这里能保证刚创建出来的一定为空
container->set_id(user_id);
if (container->GetArena() == user_basic_profile.GetArena()) {
    if (nullptr == user_basic_profile.GetArena()) {
        container->set_allocated_basic_profile(&user_basic_profile);
    } else {
        container->unsafe_arena_set_allocated_basic_profile(&user_basic_profile);
    }
} else {
    protobuf_copy_message(*container->mutable_basic_profile(), user_basic_profile); // 退化到复制message,下面会贴protobuf_copy_message的实现
}
// ... 其他类似赋值代码
int result = pack_and_send(TcaplusService::TCAPLUS_API_UPDATE_REQ, container); // 打包和RPC
if (container->GetArena() == user_basic_profile.GetArena()) {
    if (nullptr == user_basic_profile.GetArena()) {
        container->release_basic_profile();
    } else {
        container->unsafe_arena_release_basic_profile();
    }
}
// ... 其他类似释放代码
// ...

Swap退化成Copy

最后一个问题是和 Swap 接口有关。有些接口流程里,我们会用Swap来减少不必要的复制。常见的地方比如在dispatcher层(有些框架叫 executor)来解包后处理一些前置信息,完了之后透传部分数据到业务层。这时候就经常用 Swap 接口来减少不必要的复制。但是开启了Arena之后,生成的代码就变成了如下形式:

// .pb.h
inline void Swap(ConstSettingsType* other) {
  if (other == this) return;
  if (GetArena() == other->GetArena()) {
    InternalSwap(other);
  } else {
    ::PROTOBUF_NAMESPACE_ID::internal::GenericSwap(this, other);
  }
}
// 增加了一个 UnsafeArenaSwap 函数
void UnsafeArenaSwap(ConstSettingsType* other) {
  if (other == this) return;
  GOOGLE_DCHECK(GetArena() == other->GetArena());
  InternalSwap(other);
}

// .pb.cpp
void ConstSettingsType::InternalSwap(ConstSettingsType* other) {
  using std::swap;
  _internal_metadata_.Swap<::PROTOBUF_NAMESPACE_ID::UnknownFieldSet>(&other->_internal_metadata_);                        // 内部数据swap
  rpc_version_.Swap(&other->rpc_version_, &::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), GetArena()); // 字段的swap
}

// reflection_ops.cc , MessageLite的版本和这个差不多,就不列举了
void GenericSwap(Message* m1, Message* m2) {
  Arena* m2_arena = m2->GetArena();
  GOOGLE_DCHECK(m1->GetArena() != m2_arena);

  // Copy semantics in this case. We try to improve efficiency by placing the
  // temporary on |m2|'s arena so that messages are copied twice rather than
  // three times.
  Message* tmp = m2->New(m2_arena);
  std::unique_ptr<Message> tmp_deleter(m2_arena == nullptr ? tmp : nullptr);
  tmp->CheckTypeAndMergeFrom(*m1);
  m1->Clear();
  m1->CheckTypeAndMergeFrom(*m2);
  m2->GetReflection()->Swap(tmp, m2);
}

可以看到,在所属的Arena不同的时候 Swap 函数实际执行了两次Copy和一次创建Message,这种时候还不如直接Copy 。 基于此,我们原来为了编译期检查一下Copy的protobuf message的类型提供了 protobuf_copy_message 函数来代替直接 CopyFrom , 现在又额外提供了 protobuf_move_message 来处理这种转移数据的 Swap 调用。

// protobuf_copy_message
template <class TMsg>
inline void protobuf_copy_message(TMsg &dst, const TMsg &src) {
    dst.CopyFrom(src);
}

template <class TField>
inline void protobuf_copy_message(::google::protobuf::RepeatedField<TField> &dst, const ::google::protobuf::RepeatedField<TField> &src) {
    dst.Reserve(src.size());
    dst.CopyFrom(src);
}

template <class TField>
inline void protobuf_copy_message(::google::protobuf::RepeatedPtrField<TField> &dst, const ::google::protobuf::RepeatedPtrField<TField> &src) {
    dst.Reserve(src.size());
    dst.CopyFrom(src);
}

// protobuf_move_message
template <class TMsg>
inline void protobuf_move_message(TMsg &dst, TMsg &&src) {
    if (dst.GetArena() == src.GetArena()) {
        dst.Swap(&src);
    } else {
        protobuf_copy_message(dst, src);
    }

    src.Clear();
}

template <class TField>
inline void protobuf_move_message(::google::protobuf::RepeatedField<TField> &dst, ::google::protobuf::RepeatedField<TField> &&src) {
    if (dst.GetArena() == src.GetArena()) {
        dst.Swap(&src);
    } else {
        protobuf_copy_message(dst, src);
    }

    src.Clear();
}

template <class TField>
inline void protobuf_move_message(::google::protobuf::RepeatedPtrField<TField> &dst, ::google::protobuf::RepeatedPtrField<TField> &&src) {
    if (dst.GetArena() == src.GetArena()) {
        dst.Swap(&src);
    } else {
        protobuf_copy_message(dst, src);
    }

    src.Clear();
}

写在最后

目前的碰到的问题基本就这么多了,近期的 protobuf 大版本更新对Arena还有一些改进,其中包含对 std::string 类型的特殊处理和在Arena上分配Map时的一处 use-after-destroy bug ,避开使用就好了。

以上代码使用 protobuf 3.13.0 版本。在 protobuf 3.14.0 版本之前,要开启C++ Arena接口要在proto的文件级选项里加上 option cc_enable_arenas = true;