先来看一下启动监听socket的地方:
Master.cpp about line316:
///-Launch the world listener socket
port_twsport = sWorld.getConfig (CONFIG_PORT_WORLD);
std::stringbind_ip = sConfig.GetStringDefault ("BindIP", "0.0.0.0");
if(sWorldSocketMgr->StartNetwork (wsport,bind_ip.c_str ()) == -1)
{
sLog.outError ("Failed to start network");
World::StopNow(ERROR_EXIT_CODE);
// go down and shutdown the server
}
sWorldSocketMgr->Wait ();
Mangos用WorldSocketMgr这个类来管理socket的运作。StartNetwork中主要调用了StartReactiveIO来启动监听socket,看一下StartReactiveIO这个方法:
WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char*address)
{
m_UseNoDelay= sConfig.GetBoolDefault ("Network.TcpNodelay", true);
intnum_threads = sConfig.GetIntDefault ("Network.Threads", 1);//用于处理socket的线程个数
if(num_threads <= 0)
{
sLog.outError ("Network.Threads is wrong in your configfile");
return -1;
}
m_NetThreadsCount =static_cast<size_t> (num_threads +1);
m_NetThreads = new ReactorRunnable[m_NetThreadsCount]; //ReactorRunnable继承ACE_Task_Base,这是ACE线程使用方法
sLog.outBasic ("Max allowed socket connections %d",ACE::max_handles());
// -1means use default
m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff",-1);
m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff",65536);
if (m_SockOutUBuff <= 0 )
{
sLog.outError ("Network.OutUBuff is wrong in your configfile");
return -1;
}
WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;// WorldSocket::Acceptor是ACE_Acceptor< WorldSocket, ACE_SOCK_ACCEPTOR>类型,ACE的Acceptor-ConnectorFramework,WorldSocket是ACE_Svc_Handler,ACE_SOCK_ACCEPTORmacro帮助编译相应平台的ACE_Acceptor类
m_Acceptor =acc;
ACE_INET_Addr listen_addr (port, address);
if(acc->open (listen_addr, m_NetThreads[0].GetReactor(), ACE_NONBLOCK) == -1) //指定监听地址,端口和第一个线程的反应器,并启动acceptor
{
sLog.outError ("Failed to open acceptor ,check if the port isfree");
return -1;
}
for(size_t i = 0; i < m_NetThreadsCount; ++i)
m_NetThreads[i].Start ();//启动所有线程
return0;
}
每个线程都有一个反应器ACE_Reactor*m_Reactor,它的具体实现是ACE_TP_Reactor(用于多线程的reactor),
线程运行部分就是调用反应器来实现的:
virtual int svc ()
{
DEBUG_LOG ("Network Thread Starting");
WorldDatabase.ThreadStart (); //?
ACE_ASSERT (m_Reactor);
SocketSet::iterator i, t;
while (!m_Reactor->reactor_event_loop_done ())
{
// dont be too smart to move this outside the loop
// the run_reactor_event_loop will modify interval
ACE_Time_Value interval (0, 10000);
if (m_Reactor->run_reactor_event_loop (interval) ==-1) //运行10000微妙,run_reactor_event_loop本身会处理输入输出事件,只要svc_handler在reactor上注册了write,read事件。
break;
AddNewSockets (); //将m_NewSockets的socket添加到m_Sockets中
for (i = m_Sockets.begin (); i != m_Sockets.end ();)
{
if ((*i)->Update () == -1) //m_Sockets是std::set<WorldSocket*>类型,这里循环每个WorldSocket,并调用其Update()方法,他只调用handle_output方法.
{
t = i;
++i;
(*t)->CloseSocket ();
(*t)->RemoveReference ();
--m_Connections;
m_Sockets.erase (t);
}
else
++i;
}
}
WorldDatabase.ThreadEnd (); //线程将死
DEBUG_LOG ("Network Thread Exitting");
return 0;
}
客户端连接产生的socket是怎样被分配到指定线程及相应的反应器的呢?
前面已经提到过,acceptor被指定用第一个线程和相应的反应器,当有客户端连接时,acceptor将创建一个svc_handler,这里是WorldSocket,然后调用WorldSocket的open方法。下面是图示
在open方法中,调用了
// Hookfor the manager.
if(sWorldSocketMgr->OnSocketOpen (this) == -1)
return -1;
OnSocketOpen方法:
int
WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
{
// set someoptions here
if(m_SockOutKBuff >= 0)
{
if (sock->peer ().set_option (SOL_SOCKET,
SO_SNDBUF,
(void*) & m_SockOutKBuff,
sizeof (int)) == -1 && errno !=ENOTSUP)
{
sLog.outError ("WorldSocketMgr::OnSocketOpen set_o ptionSO_SNDBUF");
return -1;
}
}
staticconst int ndoption = 1;
// SetTCP_NODELAY.
if(m_UseNoDelay)
{
if (sock->peer ().set_option (ACE_IPPROTO_TCP,
TCP_NODELAY,
(void*)&ndoption,
sizeof (int)) == -1)
{
sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_optionTCP_NODELAY errno = %s", ACE_OS::strerror (errno));
return -1;
}
}
sock->m_OutBufferSize =static_cast<size_t>(m_SockOutUBuff);
// weskip the Acceptor Thread
size_t min =1;
ACE_ASSERT (m_NetThreadsCount >= 1);
for(size_t i = 1; i < m_NetThreadsCount; ++i)
if (m_NetThreads[i].Connections () <m_NetThreads[min].Connections ())
min = i;
returnm_NetThreads[min].AddSocket (sock); //均衡将worldSocket分配给某个线程
}
AddSocket将worldSocket添加到m_NewSockets中。m_NewSockets在线程运行体中被添加到m_Sockets中,并调用其update()方法.
int WorldSocket::Update (void)
{
if(closing_)
return -1;
if(m_OutActive || (m_OutBuffer->length () == 0&&msg_queue()->is_empty())) //当m_OutBuffer有数据时才调用handle_output,这里是启动输出的地方,如果输出不能在handle_output中一次性处理完,则会调用schedule_wakeup_output让reactor继续处理输出数据,当handle_output中输出处理完毕之后则调用cancel_wakeup_output,reactor不能再处理输出数据,只用等到再次Update被调用,并且m_OutBuffer有数据是才再次启动输出。
return 0;
intret;
do
ret = handle_output (get_handle ());
while( ret> 0 );// ret>0 指write event还存在,就是还有数据要发送
returnret;
}
Update主要调用handle_output方法
int WorldSocket::handle_output (ACE_HANDLE)
{
ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1); //m_OutBuffer是竞争资源需要保护
if(closing_)
return -1;
constsize_t send_len = m_OutBuffer->length ();
if(send_len == 0)
return handle_output_queue (Guard);
#ifdef MSG_NOSIGNAL
ssize_t n =peer ().send (m_OutBuffer->rd_ptr (), send_len,MSG_NOSIGNAL);
#else
ssize_t n =peer ().send (m_OutBuffer->rd_ptr (),send_len);
#endif // MSG_NOSIGNAL
if (n ==0)
return -1;
else if (n== -1)
{
if (errno == EWOULDBLOCK || errno ==EAGAIN)//在多线程环境中可能其他线程已经处理了发送数据,EWOULDBLOCK表示暂时不能发送数据.
return schedule_wakeup_output (Guard);
return -1;
}
else if (n< send_len) //now n > 0//发送了一部分数据,得继续发送
{
m_OutBuffer->rd_ptr(static_cast<size_t> (n));
// move the data to the base of the buffer
m_OutBuffer->crunch ();
return schedule_wakeup_output (Guard);//启用后run_reactor_event_loop也能响应输出事件
}
else //now n== send_len //发送完毕则发送queue里的数据
{
m_OutBuffer->reset ();
return handle_output_queue (Guard); //handle_output_queue中如果没有输出数据了则调用cancel_wakeup_output,之后run_reactor_event_loop不能响应输出事件.
}
ACE_NOTREACHED (return 0);
}
注意事项:
使用ace的svc_handler时注意,注册read事件之后,reactor会根据实际网络输入来处理输入事件,而write事件要用户自己触发。