Mangos之Socket处理方式 java socket异常处理

先来看一下启动监听socket的地方:

Master.cpp about line316:

///-Launch the world listener socket
port_twsport = sWorld.getConfig (CONFIG_PORT_WORLD);
std::stringbind_ip = sConfig.GetStringDefault ("BindIP", "0.0.0.0");

if(sWorldSocketMgr->StartNetwork (wsport,bind_ip.c_str ()) == -1)
{
sLog.outError ("Failed to start network");
World::StopNow(ERROR_EXIT_CODE);
// go down and shutdown the server
}

sWorldSocketMgr->Wait ();

Mangos用WorldSocketMgr这个类来管理socket的运作。StartNetwork中主要调用了StartReactiveIO来启动监听socket,看一下StartReactiveIO这个方法:

WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char*address)
{
m_UseNoDelay= sConfig.GetBoolDefault ("Network.TcpNodelay", true);

intnum_threads = sConfig.GetIntDefault ("Network.Threads", 1);//用于处理socket的线程个数

if(num_threads <= 0)
{
sLog.outError ("Network.Threads is wrong in your configfile");
return -1;
}

m_NetThreadsCount =static_cast<size_t> (num_threads +1);

m_NetThreads = new ReactorRunnable[m_NetThreadsCount]; //ReactorRunnable继承ACE_Task_Base,这是ACE线程使用方法

sLog.outBasic ("Max allowed socket connections %d",ACE::max_handles());

// -1means use default
m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff",-1);

m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff",65536);

if (m_SockOutUBuff <= 0 )
{
sLog.outError ("Network.OutUBuff is wrong in your configfile");
return -1;
}

WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;// WorldSocket::Acceptor是ACE_Acceptor< WorldSocket, ACE_SOCK_ACCEPTOR>类型,ACE的Acceptor-ConnectorFramework,WorldSocket是ACE_Svc_Handler,ACE_SOCK_ACCEPTORmacro帮助编译相应平台的ACE_Acceptor类


m_Acceptor =acc;

ACE_INET_Addr listen_addr (port, address);

if(acc->open (listen_addr, m_NetThreads[0].GetReactor(), ACE_NONBLOCK) == -1) //指定监听地址,端口和第一个线程的反应器,并启动acceptor
{
sLog.outError ("Failed to open acceptor ,check if the port isfree");
return -1;
}

for(size_t i = 0; i < m_NetThreadsCount; ++i)
m_NetThreads[i].Start ();//启动所有线程

return0;
}

每个线程都有一个反应器ACE_Reactor*m_Reactor,它的具体实现是ACE_TP_Reactor(用于多线程的reactor),

线程运行部分就是调用反应器来实现的:

virtual int svc ()
{
DEBUG_LOG ("Network Thread Starting");

WorldDatabase.ThreadStart (); //?

ACE_ASSERT (m_Reactor);

SocketSet::iterator i, t;

while (!m_Reactor->reactor_event_loop_done ())
{
// dont be too smart to move this outside the loop
// the run_reactor_event_loop will modify interval
ACE_Time_Value interval (0, 10000);

if (m_Reactor->run_reactor_event_loop (interval) ==-1) //运行10000微妙,run_reactor_event_loop本身会处理输入输出事件,只要svc_handler在reactor上注册了write,read事件。
break;

AddNewSockets (); //将m_NewSockets的socket添加到m_Sockets中

for (i = m_Sockets.begin (); i != m_Sockets.end ();)
{
if ((*i)->Update () == -1) //m_Sockets是std::set<WorldSocket*>类型,这里循环每个WorldSocket,并调用其Update()方法,他只调用handle_output方法.

{
t = i;
++i;
(*t)->CloseSocket ();
(*t)->RemoveReference ();
--m_Connections;
m_Sockets.erase (t);
}
else
++i;
}
}

WorldDatabase.ThreadEnd (); //线程将死

DEBUG_LOG ("Network Thread Exitting");

return 0;
}

客户端连接产生的socket是怎样被分配到指定线程及相应的反应器的呢?

前面已经提到过,acceptor被指定用第一个线程和相应的反应器,当有客户端连接时,acceptor将创建一个svc_handler,这里是WorldSocket,然后调用WorldSocket的open方法。下面是图示



在open方法中,调用了

// Hookfor the manager.
if(sWorldSocketMgr->OnSocketOpen (this) == -1)
return -1;

OnSocketOpen方法:

int
WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
{
// set someoptions here
if(m_SockOutKBuff >= 0)
{
if (sock->peer ().set_option (SOL_SOCKET,
SO_SNDBUF,
(void*) & m_SockOutKBuff,
sizeof (int)) == -1 && errno !=ENOTSUP)
{
sLog.outError ("WorldSocketMgr::OnSocketOpen set_o ptionSO_SNDBUF");
return -1;
}
}

staticconst int ndoption = 1;

// SetTCP_NODELAY.
if(m_UseNoDelay)
{
if (sock->peer ().set_option (ACE_IPPROTO_TCP,
TCP_NODELAY,
(void*)&ndoption,
sizeof (int)) == -1)
{
sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_optionTCP_NODELAY errno = %s", ACE_OS::strerror (errno));
return -1;
}
}

sock->m_OutBufferSize =static_cast<size_t>(m_SockOutUBuff);

// weskip the Acceptor Thread
size_t min =1;

ACE_ASSERT (m_NetThreadsCount >= 1);

for(size_t i = 1; i < m_NetThreadsCount; ++i)
if (m_NetThreads[i].Connections () <m_NetThreads[min].Connections ())
min = i;

returnm_NetThreads[min].AddSocket (sock); //均衡将worldSocket分配给某个线程
}

AddSocket将worldSocket添加到m_NewSockets中。m_NewSockets在线程运行体中被添加到m_Sockets中,并调用其update()方法.

int WorldSocket::Update (void)
{
if(closing_)
return -1;

if(m_OutActive || (m_OutBuffer->length () == 0&&msg_queue()->is_empty())) //当m_OutBuffer有数据时才调用handle_output,这里是启动输出的地方,如果输出不能在handle_output中一次性处理完,则会调用schedule_wakeup_output让reactor继续处理输出数据,当handle_output中输出处理完毕之后则调用cancel_wakeup_output,reactor不能再处理输出数据,只用等到再次Update被调用,并且m_OutBuffer有数据是才再次启动输出。

return 0;

intret;
do
ret = handle_output (get_handle ());
while( ret> 0 );// ret>0 指write event还存在,就是还有数据要发送

returnret;
}

Update主要调用handle_output方法

int WorldSocket::handle_output (ACE_HANDLE)
{
ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1); //m_OutBuffer是竞争资源需要保护

if(closing_)
return -1;

constsize_t send_len = m_OutBuffer->length ();

if(send_len == 0)
return handle_output_queue (Guard);

#ifdef MSG_NOSIGNAL
ssize_t n =peer ().send (m_OutBuffer->rd_ptr (), send_len,MSG_NOSIGNAL);
#else
ssize_t n =peer ().send (m_OutBuffer->rd_ptr (),send_len);
#endif // MSG_NOSIGNAL

if (n ==0)
return -1;
else if (n== -1)
Mangos之Socket处理方式 java socket异常处理
{
if (errno == EWOULDBLOCK || errno ==EAGAIN)//在多线程环境中可能其他线程已经处理了发送数据,EWOULDBLOCK表示暂时不能发送数据.
return schedule_wakeup_output (Guard);

return -1;
}
else if (n< send_len) //now n > 0//发送了一部分数据,得继续发送
{
m_OutBuffer->rd_ptr(static_cast<size_t> (n));

// move the data to the base of the buffer
m_OutBuffer->crunch ();

return schedule_wakeup_output (Guard);//启用后run_reactor_event_loop也能响应输出事件
}
else //now n== send_len //发送完毕则发送queue里的数据
{
m_OutBuffer->reset ();

return handle_output_queue (Guard); //handle_output_queue中如果没有输出数据了则调用cancel_wakeup_output,之后run_reactor_event_loop不能响应输出事件.
}

ACE_NOTREACHED (return 0);
}

注意事项:

使用ace的svc_handler时注意,注册read事件之后,reactor会根据实际网络输入来处理输入事件,而write事件要用户自己触发。

  

爱华网本文地址 » http://www.aihuau.com/a/25101014/233691.html

更多阅读

如何解决电脑快捷方式图标显示异常 桌面快捷方式图标异常

如何解决电脑快捷方式图标显示异常——简介 在使用电脑过程中,我们有时候会遇到某些软件快捷方式图标变成未知文件图标,显示不正常的情况。这种情况在XP和Win7下都可能出现,排除软件被卸载导致快捷方式图标失效,这个问题往往是图标缓存

外贸付款方式之TT付款方式 tt付款方式英文

付款方式:电汇 T/T是电汇(Telegraphic Transfer,T/T),业务上分为前T/T(预付货款)和后T/T(装船后或收货后付款)。在交易中它较之信用证风险要高一些,但是向银行缴纳的费用要比信用证是缴纳的费用低很多。其他付款方式:L/C是信用证,这个是在国际

异常处理规范 规范合理治疗白带异常

异常处理规范1.目 标本文旨在介绍NCV5环境下,采用中间件抽象框架提供的开发模型,对异常定义、使用和捕捉等的处理规范。2.异常的分类从异常(错误)的紧要程度和通常反应的处理逻辑来讲,程序员所要关心的异常主要有四类:&#216;JVM错误:

声明:《Mangos之Socket处理方式 java socket异常处理》为网友心是晴朗的分享!如侵犯到您的合法权益请联系我们删除