前言

前段时间我尝试给 atframeworklibatapp 整合进UnrealEngine做Dedicated Server和逻辑server通信的时候碰到了一些问题。主要在于这些客户端引擎一般来说默认都是关闭exception的甚至会关闭RTTI。而 libatapp 所依赖的通信组件 libatbus 里内部协议是msgpack , 而 msgpack 的官方 C++ 的header only的实现是必须开异常的功能的。所以我近期打算抽空增强一波 libatbus 的功能,增加一些跨版本向前向后兼容功能,和一些简单的验证功能(仅仅是为了防止误操作导致的问题)。具体的变更等我弄完了再发一篇。

回到协议上打解包上,本来我是想用 flatbuffers ,原因也很简单。这个通信层的协议不会太复杂,flatbuffers 对memory copy非常友好,也是head only,并且仅仅需要3个头文件,这样使用 libatbus 的时候就不需要额外管理外部的打解包层版本必须和内部的一致了。但是后来我跑测试的时候看到的结果 flatbuffers 打出的包体太大了,并不理想。所以就简单地测试了一下 msgpackflatbuffersprotobuf 效果。

测试数据

我只采用了一个简单的测试数据,贴近最大比率的实际使用场景的数据内容。也是 libatbus 的数据传输协议。数据结构大致如下:

{
    "head": {
        "cmd": 1, // 这个字段msgpack独占
        "version": 2,
        "type": 123,
        "ret": 0,
        "sequence": 9876543210,
        "src_bus_id": 0x12345678
    },
    "bdoy": {
        "data_transform_req": {
            "from": 0x123456789,
            "to": 0x987654321,
            "router": [0x123456789],
            "content": "hello world!\0",
            "flag": 1
        }
    }
}

msgpack

首先是原来使用的 msgpack , 原来使用 msgpack 很重要的原因是因为可以 header only ,并且性能足够高。而且当时的 flatbuffers 还很不成熟,对解包后的数据追加内容很不友好。其他的协议打解包库也没有特别好用的,如果不是 header only 我还不如去用 protobuf

不过之前使用 msgpack 的小对象分配比较暴力,依赖于 malloc 实现的优化。协议结构定义如下:

enum ATBUS_PROTOCOL_CMD {
    ATBUS_CMD_INVALID = 0,

    //  数据协议
    ATBUS_CMD_DATA_TRANSFORM_REQ = 1,
    ATBUS_CMD_DATA_TRANSFORM_RSP,

    // ...

    ATBUS_CMD_MAX
};
struct bin_data_block {
    const void *ptr;
    size_t size;
};

struct forward_data {
    ATBUS_MACRO_BUSID_TYPE from;                // ID: 0
    ATBUS_MACRO_BUSID_TYPE to;                  // ID: 1
    std::vector<ATBUS_MACRO_BUSID_TYPE> router; // ID: 2
    bin_data_block content;                     // ID: 3
    int flags;                                  // ID: 4

    forward_data() : from(0), to(0), flags(0) {
        content.size = 0;
        content.ptr  = NULL;
    }
    MSGPACK_DEFINE(from, to, router, content, flags);
};

class msg_body {
public:
    forward_data *forward;
    // ...

    msg_body() : forward(NULL) {}
    ~msg_body() {
        if (NULL != forward) {
            delete forward;
        }
        // ... 
    }
};

struct msg_head {
    ATBUS_PROTOCOL_CMD cmd;            // ID: 0
    int32_t version;                   // ID  1
    int32_t type;                      // ID: 2
    int32_t ret;                       // ID: 3
    uint64_t sequence;                 // ID: 4
    uint64_t src_bus_id;               // ID: 5


    msg_head() : cmd(ATBUS_CMD_INVALID), version(2), type(0), ret(0), sequence(0) {}

    MSGPACK_DEFINE(cmd, type, ret, sequence, src_bus_id, version);
};

struct msg {
    msg_head head; // map.key = 1
    msg_body body; // map.key = 2
};

// 下面对自定义encode/decode的封装就不贴了

然后代码如下:

atbus::protocol::msg m_src;
std::string packed_buffer;
char test_buffer[] = "hello world!";

{
    m_src.head.cmd        = ATBUS_CMD_DATA_TRANSFORM_REQ;
    m_src.head.version    = 2;
    m_src.head.type       = 123;
    m_src.head.ret        = 0;
    m_src.head.sequence   = 9876543210;
    m_src.head.src_bus_id = 0x12345678;

    m_src.body.forward = new ::atbus::protocol::forward_data();
    m_src.body.forward->from = 0x12345678;
    m_src.body.forward->to = 0x987654321;
    m_src.body.forward->content.ptr = test_buffer;
    m_src.body.forward->content.size = sizeof(test_buffer);
    m_src.body.forward->router.push_back(0x12345678);
    m_src.body.forward->flags = 1;

    std::stringstream ss;
    msgpack::pack(ss, m_src);
    packed_buffer = ss.str();
    std::stringstream so;
    util::string::serialization(packed_buffer.data(), packed_buffer.size(), so);
    std::cout<< "msgpack encoded(size=" << packed_buffer.size() << "): " << so.str() << std::endl;
}

输出:

msgpack encoded(size=59): \602\001\626\001{\000\717\000\000\000\002L\660\026\752\716\0224Vx\002\002\625\716\0224Vx\717\000\000\000\011\607eC!\621\716\0224Vx\704\015hello world!\000\001

打包后大小是59字节,除去数据段的13字节后,相当于附加信息的数据是46字节。 msgpack 会对整数类型有一个字节记录长度,后面才是实际整数,有一定的压缩效果。这里看起来这个长度还是比较理想的。

flatbuffers

我第一版写好的是接入了 flatbuffers , 协议定义如下:

table forward_data {
    from    : uint64   (id: 0);
    to      : uint64   (id: 1);
    router  : [uint64] (id: 2);
    content : [ubyte]  (id: 3);
    flags   : uint32   (id: 4);
}

union msg_body {
    // invalid_body        : string              (id: 0);
    custom_command_req  : custom_command_data = 1,
    custom_command_rsp  : custom_command_data = 2,
    data_transform_req  : forward_data        = 3,
    data_transform_rsp  : forward_data        = 4,
    // ...
}

table msg_head {
    version     : int32     (id: 0);
    type        : int32     (id: 1);
    ret         : int32     (id: 2);
    sequence    : uint64    (id: 3);
    src_bus_id  : uint64    (id: 4);
}

table msg {
    head: msg_head  (id: 0);
    body: msg_body  (id: 2); // id: 1 is implicitly added for body case by flatc
}

打包代码如下:

std::vector<unsigned char> packed_buffer;
char test_buffer[] = "hello world!";

{
    ::flatbuffers::FlatBufferBuilder fbb;

    uint64_t self_id = 0x12345678;
    uint32_t flags   = 0;
    flags |= atbus::protocol::ATBUS_FORWARD_DATA_FLAG_TYPE_REQUIRE_RSP;

    fbb.Finish(::atbus::protocol::Createmsg(fbb,
                                    ::atbus::protocol::Createmsg_head(fbb, ::atbus::protocol::ATBUS_PROTOCOL_CONST_ATBUS_PROTOCOL_VERSION,
                                                                    123, 0, 9876543210, self_id),
                                    ::atbus::protocol::msg_body_data_transform_req,
                                    ::atbus::protocol::Createforward_data(fbb, 0x123456789, 0x987654321, fbb.CreateVector(&self_id, 1),
                                                                        fbb.CreateVector(reinterpret_cast<const uint8_t *>(test_buffer), sizeof(test_buffer)),
                                                                        flags)
                                        .Union())

    );
    packed_buffer.assign(reinterpret_cast<const unsigned char *>(fbb.GetBufferPointer()),
        reinterpret_cast<const unsigned char *>(fbb.GetBufferPointer()) + fbb.GetSize());
    std::stringstream so;
    util::string::serialization(packed_buffer.data(), packed_buffer.size(), so);
    std::cout<< "flatbuffers encoded(size=" << packed_buffer.size() << "): " << so.str() << std::endl;
}

现在 flatbuffers 的打包和读取还是比较容易的,就是追加数据处理器来麻烦一些,但是也还行。本来 在 libatbus 内部我是想能够尽量减少数据copy的,但是 flatbuffers 一旦打包Finish了以后,数据指向的就是静态缓冲区,不允许再改变数据块大小。所以操作起来并不那么容易,所以最后还是fallback到了和 msgpack 一样来copy一次。

输出如下:

flatbuffers encoded(size=160): \020\000\000\000\000\000\012\000\022\000\010\000\007\000\014\000\012\000\000\000\000\000\000\003l\000\000\000\024\000\000\000\000\000\016\000 \000\020\000\030\000\004\000\010\000\014\000\016\000\000\0000\000\000\000\030\000\000\000\002\000\000\000\611gE#\001\000\000\000!Ce\607\011\000\000\000\015\000\000\000hello world!\000\000\000\000\001\000\000\000xV4\022\000\000\000\000\000\000\000\000\000\000\016\000\034\000\004\000\010\000\000\000\014\000\024\000\016\000\000\000\002\000\000\000{\000\000\000\752\026\660L\002\000\000\000xV4\022\000\000\000\000

这里看到,打包后的数据有夸张的 160 字节。flatbuffers 官方也是说假设你不删除字段,只会把字段设为 desperated ,而且字段ID必须从0开始,然后递增。这个是和 flatbuffers 的打包数据组织结构相关的。

flatbuffers 打包后操作的都是指向数据块内部的地址。它为了支持一个数据块可以打入多个message和数据层面支持动态的field数量,是额外多些了一个vtable进去数据块,这个vtable有点像虚表,然后指向了各个字段。这个vtable就是不允许删字段的原因,它是通过看你当前版本的 vtable 里的field的offset大于了数据块里记录的vtable的长度,就认为这个field不存在。另外 flatbuffers 的实现是对数据内容直接访问的,他的mutable接口也是要求数据长度不能发生变化。这也意味着它不能压缩数据块。

flatbuffers 官方的说法是没有unpack开销,实际上是在每次去读取的时候先找数据块里的vtable,然后找到数据真正指向的位置,如果是数字还要转字节序然后返回。其实这个开销相当于把unpack操作分散到了你读数据的地方。而且每次读都得走一遍这个流程。我觉得其实是没有 msgpackprotobuf 那种在统一的地方只解包一次的开销低的。毕竟在实际使用的过程中,大部分字段都不会只读一次。

protobuf

最后是 protobufprotobuf 也是这三个里唯一需要预编译的组件,特别是在交叉编译的时候会特别麻烦,在 protobuf 3.6.1 之前的交叉编译还得改一点它的cmake脚本,否则里面有些组件不能关掉,并且在编译libprotoc的过程中要先编译js_mbed来运行,但是交叉编译大多都是编译其他架构的target不能本地运行的。在 protobuf 3.6.1 之后才提供关闭这写组件的选项,好像是 3.7 之后才不依赖先编译js_mbed。这样对交叉编译才足够友好。

还是先贴相关的协议描述:

syntax = "proto3";

package atbus.protocol;

option optimize_for = SPEED;
option cc_enable_arenas = true;

enum ATBUS_PROTOCOL_CONST {
    option allow_alias             = true;
    ATBUS_PROTOCOL_CONST_UNKNOWN   = 0;
    ATBUS_PROTOCOL_VERSION         = 2;
    ATBUS_PROTOCOL_MINIMAL_VERSION = 2; // minimal protocol version supported
}

enum ATBUS_FORWARD_DATA_FLAG_TYPE {
    FORWARD_DATA_FLAG_NONE = 0;
    // all flags must be power of 2
    FORWARD_DATA_FLAG_REQUIRE_RSP = 1;
}

message forward_data {
    uint64   from           = 1;
    uint64   to             = 2;
    repeated uint64 router  = 3;
    bytes           content = 4;
    uint32          flags   = 5;
}

message msg_head {
    int32  version    = 1;
    int32  type       = 2;
    int32  ret        = 3;
    uint64 sequence   = 4;
    uint64 src_bus_id = 5;
}

message msg {
    msg_head head = 1;
    oneof    msg_body {
        // ... 
        forward_data        data_transform_req = 13;
        // ...
    }
}

然后打包代码:

::google::protobuf::Arena arena;
atbus::protocol::msg *    m_src = ::google::protobuf::Arena::CreateMessage<atbus::protocol::msg>(&arena);
std::string               packed_buffer;
char                      test_buffer[] = "hello world!";

{
    m_src->mutable_head()->set_version(2);
    m_src->mutable_head()->set_type(123);
    m_src->mutable_head()->set_ret(0);
    m_src->mutable_head()->set_sequence(9876543210);
    m_src->mutable_head()->set_src_bus_id(0x12345678);

    m_src->mutable_data_transform_req()->set_from(0x12345678);
    m_src->mutable_data_transform_req()->set_to(0x987654321);
    m_src->mutable_data_transform_req()->set_content(test_buffer, sizeof(test_buffer));
    m_src->mutable_data_transform_req()->add_router(0x12345678);
    m_src->mutable_data_transform_req()->set_flags(atbus::protocol::FORWARD_DATA_FLAG_REQUIRE_RSP);

    m_src->SerializeToString(&packed_buffer);
    std::stringstream so;
    util::string::serialization(packed_buffer.data(), packed_buffer.size(), so);
    std::cout << "arena(allocated= " << arena.SpaceAllocated() << ", used= " << arena.SpaceUsed() << "), protobuf encoded(size=" << packed_buffer.size()
              << "): " << so.str() << std::endl;
}

输出:

arena(allocated= 768, used= 376), protobuf encoded(size=57): \012\020\010\002\020{ \752\655\700\745$(\770\654\721\621\001j%\010\770\654\721\621\001\020\641\606\625\673\630\001\032\005\770\654\721\621\001"\015hello world!\000(\001

protobuf 会对整数类型使用varint来压缩空数据来减少长度。效果也比较好,打包后数据块只占用了57字节。 protobuf 3.0 开始,为了缓解protobuf的碎片对象问题,提供了arena功能。这个arena其实就是一个不断增长的内存块链表,可以用于保存protobuf的对象和对象数据,没有回收功能。 非常简单但也很适合 libatbus 这种动态结构少而且结构简单的协议。

在上面的sample代码里我也是开了arena,可以看到内存块分配了768字节,使用了376字节。

写在最后

flatbuffers 或是 caps’n proto 的一个涉及理念是打解包本身不做压缩,让上一层走专有的压缩模块。这样不止对整数,对字符串也可以有效压缩。但是目前我找了一圈,没找到性能足够理想的压缩算法。现在即便是很快的压缩算法如 z-std 、 brotli 、 lz4 、 snappy 等都是单核百兆级别的压缩速度。而且会追加额外的字典块。 这就很不适合我们这种传输组件了,因为本身的header就不大,字典本身可能占比很大,并且 libatbus 本身是单核 GB/秒 级别的性能,肯定是不能接受压缩数据导致新能下降一个数量级的。

前面也提到了,这个协议块内存是贴近实际使用场景的。所以它的包体大小就比较有参考意义。flatbuffers 的解包后内存占用其实就是buffer块的占用,160字节。msgpack 的解包后内存占用我没有统计,但是内部结构其实和 flatbuffers 是差不多的,比 flatbuffers 少了vtable 但是多了几个指针,估计内存占用也差不多吧。 protobuf 的内存占用比较多一些,达到了 376。protobuf 的arena默认配置是初始预分配块256字节,然后每次*2,一直到8K为止。sample里是分配了两个块一共768字节。实际上我觉得针对于这个消息直接初始预分配块512字节就好了。libatbus 的消息其实也比较容易分析估算预分配块的大小。总的来说这是内存占用,也不是很大。

我们再来关注打包后的数据大小。 flatbuffers 是160字节,减去数据内容的13字节可以理解为数据meta外占用是147字节。msgpack 是59字节,减去数据内容的13字节后是46字节。protobuf 是57字节,减去数据内容的13字节后是44字节。这个差距就比较大了。在我们的实际应用场景总,往往还有一层业务层面的包头,也差不多40-50字节。但是业务逻辑里大部分的数据包都是小包,200字节甚至100字节以下的占很大一部分比重。这里的话 flatbuffers 的147字节额外消耗几乎翻倍了这个带宽损耗。这也是我不太能接受的地方。而相对而言 protobufmsgpack 就可接受多了。我们也可以参照对比一下ipv4+tcp的header长度,不算扩展部分是40字节。

简单的压力测试没有太大的意义,缓存命中率高会导致和实际场景中相差很大,而且之前很多人也测试过了这三种序列化库的性能并没有数量级的差距。所以我就没有额外再做压力测试了。

最后我还是决定麻烦一点使用回 protobuf ,然后提供选项来指定外部的 protobuf ,这样就可以和外部使用的一样版本的 protobuf。唯一的缺陷就是增加了使用上的麻烦。但是 protobuf 的工具确实比其他两个成熟太多了,API也很方便使用。不依赖RTTI并且异常功能可以很轻松关闭。移动平台上还可以用lite库来减小运行时包大小,也算是比较完美了。