慢慢一点一点看看Boost,这段时间就Asio库吧。 据说这货和libevent的效率差不多,但是Boost的平台兼容性,你懂得。还有它帮忙干掉了很多线程安全和线程分发的事情。
Boost.Asio 依赖项:
- Boost.System (所以它必须链接boost_system)
- [可选] 如果使用read_until() or async_read_until() 函数,则依赖Boost.Regex(boost_regex)
- [可选] SSL功能依赖OpenSSL
先来个简单的,系统信号量 Signal控制:
使用ASIO操作信号量有一个注意事项,不允许再使用其他库或工具管理信号量(如signal() 或 sigaction()函数)
#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <vector>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
void signal_handler(
boost::asio::signal_set& stSigs,
const boost::system::error_code& error,
int signal_number)
{
if (error)
{
std::cout<< error.message()<< std::endl;
}
std::cout<< "Catch a signal: "<< signal_number<< std::endl;
stSigs.async_wait(boost::bind(signal_handler, boost::ref(stSigs), _1, _2));
}
void signal_handler2(
boost::asio::signal_set& stSigs,
const boost::system::error_code& error,
int signal_number)
{
if (error)
{
std::cout<< error.message()<< std::endl;
}
std::cout<< "Catch a signal - handle 2: "<< signal_number<< std::endl;
stSigs.async_wait(boost::bind(signal_handler2, boost::ref(stSigs), _1, _2));
}
int main(int argc, char* argv[]) {
boost::asio::io_service stMainService;
boost::asio::signal_set stSigs(stMainService);
stSigs.add(SIGINT);
stSigs.add(SIGTERM);
stSigs.async_wait(boost::bind(signal_handler, boost::ref(stSigs), _1, _2));
stSigs.async_wait(boost::bind(signal_handler2, boost::ref(stSigs), _1, _2));
stMainService.run();
return 0;
}
So easy吧,可以一次注册多个handler。 需要注意的是每次触发signal的handler后,handler就被取消了,需要重新注册一次。否则下一次就不会跳到这个handler了
第二个尝试,网络IO:
按照文档描述,除非使用宏来禁止功能,网络IO在不同的环境下采用了不同的实现方式:
- Windows: IOCP
- Linux: epoll
- Mac OS X, FreeBSD, NetBSD, OpenBSD: kqueue
- Solaris: /dev/poll
Boost.Asio的接口是仿IOCP的异步IO的形式的(参见:http://www.cnblogs.com/hello-leo/archive/2011/04/12/leo.html),所以,其他环境下都是模拟的。
#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <sstream>
#include <map>
#include <boost/asio.hpp>
#include <boost/random.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/atomic.hpp>
void signal_handler(
boost::asio::io_service& stMainService,
const boost::system::error_code& error,
int signal_number)
{
if (error)
{
std::cout<< error.message()<< std::endl;
}
std::cout<< "Catch a signal: "<< signal_number<< std::endl;
stMainService.stop();
}
/** 服务器 socket连接表 **/
std::map<boost::asio::ip::tcp::socket::native_handle_type,
std::pair<
boost::shared_ptr<boost::asio::ip::tcp::socket>,
boost::shared_ptr<boost::asio::streambuf>
>
> g_stServerSockMap;
boost::atomic_int32_t g_iMaxClientNumber, g_iCurClientNumber;
/**
* 服务器异步发送数据回调函数
* @param [in] ptrBuffStr 发送的数据buff(传过来仅是为了给智能指针计数+1,防止释放数据的)
* @param [in] error 错误信息
* @param [in] bytes_transferred 发送的数据大小
*/
void server_thread_send_handler(
boost::shared_ptr<std::string> ptrBuffStr,
const boost::system::error_code& error, // Result of operation.
std::size_t bytes_transferred
)
{
if (error)
{
std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Send Failed: "<< error.message()<< std::endl;
}
}
/**
* 服务器异步接收数据回调函数
* @param [in] ptrCurSock 收取数据的Socket
* @param [in] ptrSockStreamBuff 收取数据的Buff对象
* @param [in] error 错误信息
*/
void server_thread_recv_handler(
boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock,
boost::shared_ptr<boost::asio::streambuf> ptrSockStreamBuff,
const boost::system::error_code& error // Result of operation.
)
{
// 5.2.1 先接收buff长度
if (ptrSockStreamBuff->size() == sizeof(size_t))
{
const size_t* pLen = boost::asio::buffer_cast<const size_t*>(ptrSockStreamBuff->data());
boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(*pLen), boost::bind(
server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
));
// 移出socket buff
ptrSockStreamBuff->consume(sizeof(size_t));
return;
}
// 5.2.2 再接收buff数据
if (ptrSockStreamBuff->size() > 0)
{
// 5.2.3 处理数据
boost::asio::streambuf::const_buffers_type bufs = ptrSockStreamBuff->data();
std::string strOut(boost::asio::buffers_begin(bufs), boost::asio::buffers_begin(bufs) + ptrSockStreamBuff->size());
strOut += (char)(0);
std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Sock "<< ptrCurSock->native_handle()<< " Recv Data: "<< strOut<< std::endl;
// // 5.2.4 移出socket buff
ptrSockStreamBuff->consume(ptrSockStreamBuff->size());
// Sleep 500 毫秒。模拟正忙
boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
// 5.2.5 回包
// 组装数据
std::stringstream stServerSendBuf;
stServerSendBuf << "Server Thread "<< boost::this_thread::get_id()<< " Say Got{ "<< strOut<< " }";
boost::shared_ptr<std::string> ptrBuffStr = boost::shared_ptr<std::string>(new std::string(stServerSendBuf.str()));
// 先发Buff长度
size_t uBufLen = ptrBuffStr->size();
boost::asio::async_write(*ptrCurSock, boost::asio::buffer(&uBufLen, sizeof(uBufLen)), boost::asio::transfer_exactly(sizeof(uBufLen)),
boost::bind(server_thread_send_handler, ptrBuffStr, _1, _2)
);
// 再发数据
boost::asio::async_write(*ptrCurSock, boost::asio::buffer(*ptrBuffStr), boost::asio::transfer_exactly(ptrBuffStr->size()),
boost::bind(server_thread_send_handler, ptrBuffStr, _1, _2)
);
// 5.2.6 重新监听
boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(sizeof(size_t)), boost::bind(
server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
));
}
// 5.2.7 关闭退出
if (error == boost::asio::error::eof)
{
std::cout<< "xxxx Server Thread "<< boost::this_thread::get_id()<< " Sock Release: "<< ptrCurSock->native_handle()<< std::endl;
g_stServerSockMap.erase(ptrCurSock->native_handle());
g_iCurClientNumber.fetch_sub(1);
return;
}
if (error)
{
std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Recv Failed: "<< error.message()<< std::endl;
return;
}
}
/**
* 服务器异步接受Socket回调函数
* @param [in] stMainService 服务对象
* @param [in] stAccepter 接受Socket
* @param [in] ptrCurSock 建立连接的Socket
* @param [in] error 错误信息
*/
void server_thread_accept_handler(
boost::asio::io_service& stMainService,
boost::asio::ip::tcp::acceptor& stAccepter,
boost::shared_ptr<boost::asio::ip::tcp::socket>& ptrCurSock,
const boost::system::error_code& error // Result of operation.
)
{
if (error)
{
std::cout<< "++++ Server Accept Failed: "<< error.message()<< std::endl;
return;
}
std::cout<< "++++ Server Thread "<< boost::this_thread::get_id()<< " Accept a socket: "<< ptrCurSock->native_handle()<< std::endl;
std::cout<< "\tServer Thread Local End Point:"<< std::endl<<
"\t\tServer Thread Address: "<< ptrCurSock->local_endpoint().address().to_string()<< std::endl<<
"\t\tServer Thread Port: "<< ptrCurSock->local_endpoint().port()<< std::endl<<
"\t\tServer Thread Protocol: "<< ptrCurSock->local_endpoint().protocol().protocol()<< std::endl;
std::cout<< "\tServer Thread Remote End Point:"<< std::endl<<
"\t\tServer Thread Address: "<< ptrCurSock->remote_endpoint().address().to_string()<< std::endl<<
"\t\tServer Thread Port: "<< ptrCurSock->remote_endpoint().port()<< std::endl<<
"\t\tServer Thread Protocol: "<< ptrCurSock->remote_endpoint().protocol().protocol()<< std::endl;
g_iCurClientNumber.fetch_add(1);
g_iMaxClientNumber = std::max(g_iMaxClientNumber.load(), g_iCurClientNumber.load());
// Step 5.1 接受链入的Socket
g_stServerSockMap[ptrCurSock->native_handle()] = std::make_pair(
ptrCurSock,
boost::shared_ptr<boost::asio::streambuf>(
new boost::asio::streambuf()
)
);
// Step 5.2 设置Socket接收数据回调
boost::shared_ptr<boost::asio::streambuf> ptrSockStreamBuff = g_stServerSockMap[ptrCurSock->native_handle()].second;
boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(sizeof(size_t)), boost::bind(
server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
));
// Step 5.3 创建Socket, 接受新Socket
ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
new boost::asio::ip::tcp::socket(stMainService)
);
stAccepter.async_accept(*ptrCurSock, boost::bind(
server_thread_accept_handler, boost::ref(stMainService), boost::ref(stAccepter), boost::ref(ptrCurSock), _1
));
}
/**
* 服务器工作线程
* @param [in] stMainService 服务对象
*/
void server_thread(boost::asio::io_service& stMainService)
{
std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Started"<< std::endl;
stMainService.run(); // 这样,就加到线程池中去了
}
/**
* 服务器主线程
* @param [in] server_num 服务线程数
*/
void server_main_thread(int server_num)
{
// Step 1. 创建服务对象
static boost::asio::io_service stMainService;
// Step 2. 创建地址生成器及生成地址
boost::asio::ip::tcp::resolver stResolver(stMainService);
// 其实第二个参数8731也可以写成http、ftp什么的,所以他这里用了字符串
boost::asio::ip::tcp::endpoint stEndpoint = *stResolver.resolve(boost::asio::ip::tcp::resolver::query("127.0.0.1", "8731"));
// Step 3. 创建接收器
boost::asio::ip::tcp::acceptor stAccepter(stMainService, stEndpoint);
// Step 4. 创建Socket
boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
new boost::asio::ip::tcp::socket(stMainService)
);
stAccepter.async_accept(*ptrCurSock, boost::bind(
server_thread_accept_handler, boost::ref(stMainService), boost::ref(stAccepter), boost::ref(ptrCurSock), _1
));
// Step 5. 创建线程并加入到服务线程池
std::vector<boost::thread*> stls;
for (int i = 0; i < server_num; ++ i)
{
// 打印输出错开
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
stls.push_back(
new boost::thread(server_thread, boost::ref(stMainService))
);
}
// Ctrl + C退出服务
boost::asio::signal_set stSigs(stMainService);
stSigs.add(SIGINT);
stSigs.add(SIGTERM);
stSigs.async_wait(boost::bind(signal_handler, boost::ref(stMainService), _1, _2));
// Step 6. 阻塞主线程,等待服务线程退出
for (size_t i = 0; i < stls.size(); ++ i)
{
stls[i]->join();
delete stls[i];
}
}
/**
* 客户端网络线程函数
*/
void client_thread()
{
std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Started"<< std::endl;
// Step 1. 创建服务对象
static boost::asio::io_service stMainService;
// Step 2. 创建地址生成器及生成地址
boost::asio::ip::tcp::resolver stResolver(stMainService);
// 其实第二个参数8731也可以写成http、ftp什么的,所以他这里用了字符串
boost::asio::ip::tcp::endpoint stEndpoint = *stResolver.resolve(boost::asio::ip::tcp::resolver::query("127.0.0.1", "8731"));
// Step 3. 创建Socket
boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
new boost::asio::ip::tcp::socket(stMainService)
);
// Step 4. 连接服务器
ptrCurSock->connect(stEndpoint);
std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Connect Success => "<< ptrCurSock->local_endpoint().port()<< std::endl;
//** 睡眠一段时间再发数据
static boost::random::mt19937 rng;
static boost::random::uniform_int_distribution<> sleep_rdm(0,5000);
int iSleepFor = 1000 + sleep_rdm(rng);
boost::this_thread::sleep_for(boost::chrono::milliseconds(iSleepFor));
// ** 构造数据
boost::asio::streambuf stSendedBuff;
std::ostream stSendedBuffOS(&stSendedBuff);
stSendedBuffOS<< "---- Client Thread "<< boost::this_thread::get_id()<< " Sock "<< ptrCurSock->local_endpoint().port()<< " Say Hello";
// Step 5 客户端同步发送数据
size_t ullBufSize = stSendedBuff.size();
boost::asio::write(*ptrCurSock, boost::asio::buffer(&ullBufSize, sizeof(ullBufSize)), boost::asio::transfer_exactly(sizeof(ullBufSize)));
boost::asio::write(*ptrCurSock, stSendedBuff, boost::asio::transfer_at_least(stSendedBuff.size() - sizeof(ullBufSize)));
// sent data is removed from input sequence
stSendedBuff.consume(stSendedBuff.size());
std::cout<< "**** Client Thread "<< boost::this_thread::get_id()<< " Sended"<< std::endl;
// Step 6 客户端同步接收数据
char strRecvStr[2048];
ptrCurSock->read_some(boost::asio::buffer(strRecvStr, sizeof(size_t)));
ptrCurSock->read_some(boost::asio::buffer(strRecvStr + sizeof(size_t), *((size_t*)(strRecvStr))));
std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Recv Data: "<< strRecvStr + sizeof(size_t)<< std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds(10000 - iSleepFor));
// Step 7. 等待服务关闭
stMainService.run();
// Step 8. Socket 关闭退出
ptrCurSock->close();
}
void print_asio_info();
int main(int argc, char* argv[]) {
print_asio_info();
g_iMaxClientNumber.store(0);
g_iCurClientNumber.store(0);
boost::thread st(server_main_thread, 3);
std::vector<boost::thread*> ctl;
const int max_client_num = 128;
for (int i = 0; i < max_client_num; ++ i)
{
// 打印输出错开
boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
ctl.push_back(new boost::thread(client_thread));
}
st.join();
for (size_t i = 0; i < ctl.size(); ++ i)
{
ctl[i]->join();
delete ctl[i];
}
std::cout<< "All done, max clients: "<< g_iMaxClientNumber.load()<< std::endl;
print_asio_info();
return 0;
}
void print_asio_info()
{
#if defined(BOOST_ASIO_HAS_IOCP)
std::cout<< "Boost.Asio using iocp"<< std::endl;
#if defined(BOOST_ASIO_HAS_SERIAL_PORT)
std::cout<< "\tSerial Port: enabled"<< std::endl;
#else
std::cout<< "\tSerial Port: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE)
std::cout<< "\tStream handle: enabled"<< std::endl;
#else
std::cout<< "\tStream handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_RANDOM_ACCESS_HANDLE)
std::cout<< "\tWindows Random Access Handle: enabled"<< std::endl;
#else
std::cout<< "\tWindows Random Access Handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_OBJECT_HANDLE)
std::cout<< "\tObject Handle: enabled"<< std::endl;
#else
std::cout<< "\tObject Handle: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_WINDOWS_OVERLAPPED_PTR)
std::cout<< "\tWindows Overlapped Ptr: enabled"<< std::endl;
#else
std::cout<< "\tWindows Overlapped Ptr: disabled"<< std::endl;
#endif
#elif defined(BOOST_ASIO_HAS_EPOLL)
std::cout<< "Boost.Asio using epoll."<< std::endl;
#if defined(BOOST_ASIO_HAS_EVENTFD)
std::cout<< "\teventfd: enabled"<< std::endl;
#else
std::cout<< "\teventfd: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_TIMERFD)
std::cout<< "\timerfd: enabled"<< std::endl;
#else
std::cout<< "\timerfd: disabled"<< std::endl;
#endif
#elif defined(BOOST_ASIO_HAS_KQUEUE)
std::cout<< "Boost.Asio using kqueue"<< std::endl;
#elif defined(BOOST_ASIO_HAS_DEV_POLL)
std::cout<< "Boost.Asio using solaris: /dev/poll"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
std::cout<< "Posix Stream Descriptor: enabled"<< std::endl;
#else
std::cout<< "Posix Stream Descriptor: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
std::cout<< "Unix domain socket: enabled"<< std::endl;
#else
std::cout<< "Unix domain socket: disabled"<< std::endl;
#endif
#if defined(BOOST_ASIO_HAS_SIGACTION)
std::cout<< "sigaction: enabled"<< std::endl;
#else
std::cout<< "sigaction: disabled"<< std::endl;
#endif
}
这个搞起来相当费劲,经常需要跳转到Boost的源码中,查看一些回调函数的定义式。 write和write_some函数在completion_condition返回0时才发送,否则将数据加入到发送窗口,并且没有发生数据拷贝,也就是说,如果是异步操作,开发者必须保证发送时数据有效。(这类函数默认的completion_condition是仿函数transfer_all,返回default_max_transfer_size: 65536) 同样,read和read_some也是这样。 Send和receive函数才是立即执行的(不推荐使用)。
另外,streambuf流用于管理发送或接收缓冲,但是在发送或接收完后,要执行consume函数移出或commit移入缓冲区,否则数据不会被销毁。
UDP和TCP的类似,我就不再多写一个demo了。 以上sample的client和server的读数据采用了两种不同的方式
有一点比较爽,在多线程条件下 io_service的run函数是线程安全的,也就是说,多个线程调用同一个run的时候,就自动被加入工作线程池,在消息到来的时候io_service会找到一个可用的线程进行处理。
注:以上代码Visual Studio中需要包含Boost的include目录和lib目录;GCC或Clang中需要加编译选项-I[BOOST_PREFIX目录]/include –L[BOOST_PREFIX目录]/lib -lboost_random -lboost_thread -lboost_system -lstdc++ -lrt -pthread -D_POSIX_MT_, 如果BOOST不在系统动态库查找目录中且同时存在Boost的动态和静态库则加上 -static 选项
另外,这个demo代码包含一些功能检测的代码。
小试牛的第三刀,定时器Timer:
还是喜欢上代码,so…
#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <sstream>
#include <map>
#include <boost/chrono.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
void signal_handler(
boost::asio::io_service& stMainService,
const boost::system::error_code& error,
int signal_number)
{
if (error)
{
std::cout<< error.message()<< std::endl;
}
std::cout<< "Catch a signal: "<< signal_number<< std::endl;
stMainService.stop();
}
void deadline_timer_handler(
const boost::system::error_code& error // Result of operation.
)
{
std::cout<< "On Deadline Timer Actived"<< std::endl;
}
void waitable_timer_handler(
const boost::system::error_code& error // Result of operation.
)
{
std::cout<< "On Waitable Timer Actived"<< std::endl;
}
void steady_timer_handler(
boost::asio::basic_waitable_timer<boost::chrono::steady_clock>& stSteadyTimer,
const boost::system::error_code& error // Result of operation.
)
{
std::cout<< "On Steady Timer Actived"<< std::endl;
if (error == boost::asio::error::operation_aborted)
{
std::cout<< "On Steady Timer Abouted"<< std::endl;
}
else
{
stSteadyTimer.expires_at(stSteadyTimer.expires_at() + boost::chrono::seconds(1));
stSteadyTimer.async_wait(boost::bind(steady_timer_handler, boost::ref(stSteadyTimer), _1));
}
}
int main(int argc, char* argv[]) {
// Step 1. 创建服务对象
boost::asio::io_service stMainService;
// =========== deadline timer ===========
// Step 2(1) 创建Timer对象
boost::asio::deadline_timer stDeadlineTimer(stMainService);
stDeadlineTimer.expires_from_now(boost::posix_time::seconds(2));
// Step 3(1) 设置Timer回调
stDeadlineTimer.async_wait(deadline_timer_handler);
// =========== high_resolution_clock timer ===========
// Step 2(2) 创建Timer对象
boost::asio::basic_waitable_timer<boost::chrono::high_resolution_clock> stWaitableTimer(stMainService);
stWaitableTimer.expires_from_now(boost::chrono::seconds(1));
// Step 3(2) 设置Timer回调
stWaitableTimer.async_wait(waitable_timer_handler);
// =========== steady timer ===========
// Step 2(3) 创建Timer对象
boost::asio::basic_waitable_timer<boost::chrono::steady_clock> stSteadyTimer(stMainService);
stSteadyTimer.expires_from_now(boost::chrono::seconds(1));
// Step 3(3) 设置Timer回调
stSteadyTimer.async_wait(boost::bind(steady_timer_handler, boost::ref(stSteadyTimer), _1));
// =========== system_timer ===========
// =========== 对应 chrono::system_clock 懒得演示了,都差不多,只是时间单位和载体不同而已 ===========
// 绑定 Ctrl + C退出服务
boost::asio::signal_set stSigs(stMainService);
stSigs.add(SIGINT);
stSigs.add(SIGTERM);
stSigs.async_wait(boost::bind(signal_handler, boost::ref(stMainService), _1, _2));
stMainService.run();
return 0;
}
话说Boost.Asio每次异步wait回调之后还要重新wait一下挺麻烦的
额外功能:
设备文件支持
boost::asio::serial_port 可以打开一个Unix设备文件,并作为输入输出流,然后可以用自由函数boost::asio::read(),boost::asio::async_read(),boost::asio::write(),boost::asio::async_write(),boost::asio::read_until() 和 boost::asio::async_read_until()操作这些文件
在Windows上,需要系统支持I/O completion port时才能使用,可以通过BOOST_ASIO_HAS_SERIAL_PORTS 这个宏来检测是否可用这个功能(如果定义了则可用)。
Unix系统功能
第一项是Unix Domain Sockets
这种socket可以通过
local::datagram_protocol::socket socket1(my_io_service);
local::datagram_protocol::socket socket2(my_io_service);
local::connect_pair(socket1, socket2);
//或者
//服务端
local::stream_protocol::endpoint ep("/tmp/foobar");
local::stream_protocol::acceptor acceptor(my_io_service, ep);
local::stream_protocol::socket socket(my_io_service);
acceptor.accept(socket);
//客户端
local::stream_protocol::endpoint ep("/tmp/foobar");
local::stream_protocol::socket socket(my_io_service);
socket.connect(ep);
来使用,其他的部分和普通的Socket一样
第二项是指向流的文件描述符
posix::stream_descriptor in(my_io_service, ::dup(STDIN_FILENO)); posix::stream_descriptor out(my_io_service, ::dup(STDOUT_FILENO));
创建出的descriptor可以用asio的自由函数的读写函数操作
第三项是fork支持通过notify_fork函数来重建内部描述符
SSL支持
这部分依赖OpenSSL,简单的说,就是在socket外面包了一层,然后操作带ssl的socket。
大概就是
typedef ssl::stream<tcp::socket> ssl_socket;
然后用ssl_socket代替tcp::socket
简要性能测试
- 测试机器: CPU Intel(R) Xeon(R) CPU X3440 @ 2.53GHz × 8 , 内存 8096MB
- 测试环境: Linux 2.6.32.43, GCC 4.8.0, 编译优化配置 -O2 -g -ggdb
- 程序配置: 16 工作进程,1主进程,主线程参与逻辑
- 测试结果: 5000-8000连接,每秒收到约320K个报文,7MB流量,每秒发送约320K个报文,12MB流量, CPU 负载: 180%(5000连接) – 195% (8000连接)
结论: 不知道为什么,压力再也上不去了, 我是把输出重定向到文件的,所以开始以为是磁盘IO爆了,写出日志次数太多,但是gperftools显示磁盘IO占用并不高,性能分布还是比较平均的。但是基本上就在16万个报文了(每个包有一次发送长度的包[4字节]和一次数据的send[不定长])
测试代码地址: https://gist.github.com/owent/5660983 profile性能分析报告: http://api.owent.net/resource/doc/link/test.asio.pdf
主要就是这个样子,一些更底层的东西比如srand可以以后再看( * ^ _ ^ * )