//////////////////////////////////////////////////////////////////////////
CIocpServer::CIocpServer(void)
{
m_bStatus =
false
;
m_dwFlags = 0;
m_dwSockLen = sizeof(sockaddr_in) + 16;
m_uThreads = _GetProcessOfNumber();
m_uThreadId = 0;
m_lpNotifyProc = NULL;
m_lpfnAcceptEx = NULL;
m_lpfnDisconnectEx = NULL;
m_sListenSocket = INVALID_SOCKET;
m_hWorkEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,NULL,0);
InitializeCriticalSection(&cIocpSection);
memset(&m_lpContext,0,sizeof(m_lpContext));
memset(&m_uSocketPool,0,sizeof(m_uSocketPool));
}
CIocpServer::~CIocpServer(void)
{
WSACleanup();
CloseHandle(m_hWorkEvent);
DeleteCriticalSection(&cIocpSection);
}
unsigned __stdcall _WorkerThread(void *lpContext)
{
CIocpServer *pSvr = (CIocpServer*)lpContext;
HANDLE hCompletionPort = pSvr->m_hCompletionPort;
LP_PER_IO_OPENATION_DATA lpPerIoData;
LP_PER_SOCKET_CONTEXT lpPerStContext;
unsigned long ulBytesTransferred;
InterlockedIncrement((PLONG)&pSvr->m_uThreadId);
unsigned long ulThreadId = pSvr->m_uThreadId;
TRACE(
"_workerthread start <%d>\n"
,ulThreadId);
while
(WaitForSingleObject(pSvr->m_hWorkEvent,0) != WAIT_OBJECT_0)
{
BOOL bRet = GetQueuedCompletionStatus(hCompletionPort,
&ulBytesTransferred,
(PULONG_PTR)&lpPerStContext,
(LPOVERLAPPED*)&lpPerIoData,
INFINITE);
if
(ulBytesTransferred == -1 && lpPerStContext == NULL && lpPerIoData == NULL)
{
//thread
exit
TRACE(
"_workerthread exit %d\n"
,ulThreadId);
return
0;
}
if
(ulBytesTransferred == 0 && (lpPerIoData->IoTYPE == send_flag || lpPerIoData->IoTYPE == recv_flag))
{
BOOL bRemove = FALSE;
EnterCriticalSection(&pSvr->cIocpSection);
bRemove = pSvr->mapStContext.RemoveKey(lpPerStContext->sClientSocket);
LeaveCriticalSection(&pSvr->cIocpSection);
if
(bRemove)
{ /*
* notice callback
function
leave online list.
*/
pSvr->m_lpNotifyProc(lpPerStContext, NULL, 0, pSvr->m_lpContext[1],0);
TRACE(
"client %d exit"
,lpPerStContext->sClientSocket);
/* setsockopt linger stop time_wait status */
linger lingerstruct;
lingerstruct.l_onoff = 1;
lingerstruct.l_linger = 0;
if
(setsockopt(lpPerStContext->sClientSocket,SOL_SOCKET,SO_LINGER,(char*)&lingerstruct,sizeof(lingerstruct)) == SOCKET_ERROR)
{
TRACE(
"SO_LINGER error %d\n"
,WSAGetLastError());
}
/* close socket handle
free
memory */
closesocket(lpPerStContext->sClientSocket);
GlobalFree(lpPerStContext);
GlobalFree(lpPerIoData);
}
continue
;
}
switch (lpPerIoData->IoTYPE)
{
case
aept_flag:
{
if
(!pSvr->m_bStatus)
{
/*
free
completion per-overlapped-data */
closesocket(lpPerIoData->sAcceptSocket);
GlobalFree(lpPerIoData);
continue
;
}
pSvr->_RunAcceptEx(lpPerIoData);
pSvr->_PostAcceptEx(lpPerStContext);
}
break
;
case
recv_flag:
{
/* configure zlib uncompress flag */
if
(lpPerStContext->bMainSocket ==
true
)
{
char szRecvData[buf_size * 2]=
""
;
u_long ulRecvLength = sizeof(szRecvData);
if
(uncompress((Bytef*)szRecvData, &ulRecvLength, (const Bytef*)&lpPerIoData->BitBuf, ulBytesTransferred) == Z_OK)
{
//
如果解压DATA失败 则不调用回调函数去处理数据,只需要投递下一个WSARecv请求
pSvr->m_lpNotifyProc(lpPerStContext, szRecvData, ulRecvLength, pSvr->m_lpContext[1],0);
}
//
在此解压DATA失败的消息
}
else
{
//
如果不是主SOCKET则不需要解压缩数据
pSvr->m_lpNotifyProc(lpPerStContext, lpPerIoData->BitBuf, ulBytesTransferred, pSvr->m_lpContext[1],0);
}
/* post next recv */
pSvr->_PostRecvData(lpPerStContext,lpPerIoData);
}
break
;
case
send_flag:
{
GlobalFree(lpPerIoData);
ulBytesTransferred = 0;
}
break
;
default:
break
;
}
}
return
0;
}
bool CIocpServer::SendMsg(SOCKET sclient, char* data, u_long ulDataLength, bool bflag /* =
true
*/)
{
if
(sclient == INVALID_SOCKET || data == NULL || ulDataLength == 0)
return
false
;
//
申请单IO数据
LP_PER_IO_OPENATION_DATA lpPerIoData = (LP_PER_IO_OPENATION_DATA) \
GlobalAlloc(GPTR,sizeof(PER_IO_OPENATION_DATA));
memset(lpPerIoData->BitBuf,0,buf_size);
memset(&(lpPerIoData->Overlapped),0,sizeof(OVERLAPPED));
if
(bflag ==
true
)
{
char szBuf[buf_size * 2];
u_long compreLen = buf_size * 2;
compress((Bytef*)szBuf, &compreLen, (const Bytef*)data, ulDataLength);
memcpy(lpPerIoData->BitBuf, szBuf, compreLen);
}
else
{
memcpy(lpPerIoData->BitBuf,data,ulDataLength);/* copy send buffer */
}
lpPerIoData->WsaBuf.buf = lpPerIoData->BitBuf;
lpPerIoData->WsaBuf.len = ulDataLength;
lpPerIoData->IoTYPE = send_flag;
DWORD dwOfBytesSent;
int nRet = WSASend(sclient,
&(lpPerIoData->WsaBuf),
1,
&dwOfBytesSent,
m_dwFlags,
&(lpPerIoData->Overlapped),
NULL);
if
(nRet == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
TRACE(
"WSASend error at %d\n"
,WSAGetLastError());
GlobalFree(lpPerIoData);
return
false
;
}
else
return
true
;
return
false
;
}
bool CIocpServer::Start(NOTIFYPROC *pNotifyProc, void *lpContext, u_short uPort)
{
if
(m_bStatus ==
true
)
return
true
;
TRACE(
"IOCPServer Start..."
);
if
(uPort == 0)
uPort = 80;
m_bStatus =
true
;
m_lpContext[1] = lpContext;
//this
指针
m_lpNotifyProc = pNotifyProc;
_InitIOCPServer(uPort);
return
true
;
}
bool CIocpServer::Shutdown()
{
if
(m_bStatus ==
false
)
return
0;
TRACE(
"IOCPServer Shutdown..."
);
m_bStatus =
false
;
//
CancelIo listen socket
CancelIo((HANDLE)m_sListenSocket);
LP_PER_SOCKET_CONTEXT lpPerStContext;
POSITION pi = mapStContext.GetStartPosition();
while
(pi)
{
SOCKET sClient;
mapStContext.GetNextAssoc(pi,sClient,lpPerStContext);
shutdown
(lpPerStContext->sClientSocket,0x02);
CancelIo((HANDLE)lpPerStContext->sClientSocket);
mapStContext.RemoveKey(sClient);
lpPerStContext = NULL;
}
mapStContext.RemoveAll();
for
(u_int i = 0; i < m_uThreads; i++)
{
PostQueuedCompletionStatus(m_hCompletionPort,-1,NULL,NULL);
Sleep(10);
}
//
free
for
listen per-socket-data
SetEvent(m_hWorkEvent);
//
set
event status
GlobalFree(m_lpContext[0]);
closesocket(m_sListenSocket);
CloseHandle(m_hCompletionPort);
return
true
;
}
bool CIocpServer::DisConnect(SOCKET sclient)
{
if
(m_bStatus)
{
EnterCriticalSection(&cIocpSection);
LP_PER_SOCKET_CONTEXT lpPerFindStContext;
BOOL bLookup = FALSE;
bLookup = mapStContext.Lookup(sclient,lpPerFindStContext);
if
(bLookup && lpPerFindStContext->sClientSocket == sclient)
{
shutdown
(lpPerFindStContext->sClientSocket,0x02);
closesocket(lpPerFindStContext->sClientSocket);
mapStContext.RemoveKey(sclient);
lpPerFindStContext = NULL;
}
LeaveCriticalSection(&cIocpSection);
}
return
true
;
}
bool CIocpServer::_PostAcceptEx(LP_PER_SOCKET_CONTEXT lpPerStContext)
{
lpPerStContext->sClientSocket = INVALID_SOCKET;
lpPerStContext->ulIpAddress = 0;
lpPerStContext->bMainSocket =
false
;
memset(lpPerStContext->ulDialog,0,sizeof(lpPerStContext->ulDialog));
memset(lpPerStContext->szIpString,0,sizeof(lpPerStContext->szIpString));
//
for
listen socket handle new lpPerIoData memory
LP_PER_IO_OPENATION_DATA lpPerIoData = (LP_PER_IO_OPENATION_DATA) \
GlobalAlloc(GPTR,sizeof(PER_IO_OPENATION_DATA));
//_RetvalSocketPool
(NULL,lpPerIoData,
true
);
//if
(lpPerIoData->sAcceptSocket == 0)
lpPerIoData->sAcceptSocket = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if
(lpPerIoData->sAcceptSocket == INVALID_SOCKET)
{
return
false
;
}
lpPerIoData->IoTYPE = aept_flag;
//
AcceptEx
type
DWORD dwBytesOfAcceptEx = 0;
BOOL bRet = m_lpfnAcceptEx(m_sListenSocket,/* In the listen socket handle post AcceptEx */
lpPerIoData->sAcceptSocket,
lpPerIoData->BitBuf,
0,
m_dwSockLen,/* the is sizeof sockaddr_in add 16 bytes */
m_dwSockLen,
&dwBytesOfAcceptEx,
&(lpPerIoData->Overlapped));
if
(bRet == FALSE && WSAGetLastError() != WSA_IO_PENDING)
{
TRACE(
"m_lpfnAcceptEx post error at %d\n"
,WSAGetLastError());
GlobalFree(lpPerIoData);
return
false
;
}
return
true
;
}
bool CIocpServer::_RunAcceptEx(LP_PER_IO_OPENATION_DATA lpPerIoData /* = NULL */)
{
//
for
this single Io new apply single handle memory
LP_PER_SOCKET_CONTEXT lpPerStContext = (LP_PER_SOCKET_CONTEXT) \
GlobalAlloc(GPTR,sizeof(PER_SOCKET_CONTEXT));
lpPerStContext->bMainSocket =
true
;
lpPerStContext->sClientSocket = lpPerIoData->sAcceptSocket;
//
set
update socket accept status
if
(setsockopt(lpPerStContext->sClientSocket,SOL_SOCKET,SO_UPDATE_ACCEPT_CONTEXT,
(char*)&m_sListenSocket,sizeof(SOCKET)) == SOCKET_ERROR)
{
TRACE(
"error\n"
);
return
false
;
}
struct sockaddr_in acceptaddr;
int addrlen = sizeof(acceptaddr);
memset(&acceptaddr,0,sizeof(acceptaddr));
if
(getpeername(lpPerStContext->sClientSocket,(sockaddr*)&acceptaddr,&addrlen) == SOCKET_ERROR)
{
TRACE(
"getpeername\n"
);
return
0;
}
/* copy client ip address 16bit */
lpPerStContext->ulIpAddress = acceptaddr.sin_addr.S_un.S_addr;
strcpy_s(lpPerStContext->szIpString,inet_ntoa(acceptaddr.sin_addr));
BOOL bKeepAlive = 1;
tcp_keepalive t_keepalive;
t_keepalive.onoff =
true
;
t_keepalive.keepalivetime = (1000 * 60) * 3;
//
3 minutes
t_keepalive.keepaliveinterval = 1000 * 10;
//
10ms
if
(setsockopt(lpPerStContext->sClientSocket,SOL_SOCKET,SO_KEEPALIVE,(const char*)&bKeepAlive,sizeof(bKeepAlive)) == SOCKET_ERROR)
{
TRACE(
"error\n"
);
return
false
;
}
//
Set TCP KeepAlive
DWORD dwBytesRetval;
WSAIoctl(lpPerStContext->sClientSocket,
SIO_KEEPALIVE_VALS,
&t_keepalive,
sizeof(t_keepalive),
&bKeepAlive,
sizeof(BOOL),
&dwBytesRetval,/*
if
bytes param
for
null. retval 10014 error
id
*/
NULL,
NULL);
//
save socket to map
EnterCriticalSection(&cIocpSection);
mapStContext.SetAt(lpPerStContext->sClientSocket,lpPerStContext);
LeaveCriticalSection(&cIocpSection);
//
bind Io port
_BindCreateIoCompletionPort(lpPerStContext->sClientSocket,m_hCompletionPort,(DWORD)lpPerStContext,0);
_PostRecvData(lpPerStContext,lpPerIoData);
return
true
;
}
bool CIocpServer::_PostRecvData(LP_PER_SOCKET_CONTEXT lpPerStContext /* = NULL */, LP_PER_IO_OPENATION_DATA lpPerIoData /* = NULL */)
{
memset(lpPerIoData->BitBuf,0,buf_size);
memset(&(lpPerIoData->Overlapped),0,sizeof(OVERLAPPED));
lpPerIoData->WsaBuf.buf = lpPerIoData->BitBuf;
lpPerIoData->WsaBuf.len = buf_size;
lpPerIoData->IoTYPE = recv_flag;
DWORD dwBytesRecvd;
int nRet = WSARecv(lpPerStContext->sClientSocket,
&(lpPerIoData->WsaBuf),
1,
&dwBytesRecvd,
&m_dwFlags,
&(lpPerIoData->Overlapped),
NULL);
if
(nRet == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
TRACE(
"WSARecv error at %d\n"
,WSAGetLastError());
GlobalFree(lpPerIoData);
return
false
;
}
else
return
true
;
return
false
;
}
bool CIocpServer::_RetvalSocketPool(LP_PER_SOCKET_CONTEXT lpPerStContext /* = NULL */, LP_PER_IO_OPENATION_DATA lpPerIoData /* = NULL */, bool bflag)
{
if
(lpPerStContext == NULL && lpPerIoData == NULL)
return
false
;
EnterCriticalSection(&cIocpSection);
if
(bflag)
{
for
(int nItem = 0; nItem < SHRT_MAX; nItem++)
{
if
(m_uSocketPool[nItem] != 0)
{
if
(m_uSocketPool[nItem] == INVALID_SOCKET)
{
closesocket(m_uSocketPool[nItem]);
m_uSocketPool[nItem] = 0;
continue
;
}
lpPerIoData->sAcceptSocket = m_uSocketPool[nItem];
m_uSocketPool[nItem] = 0;
LeaveCriticalSection(&cIocpSection);
return
true
;
}
}
//TRACE
(
"not find socketpool item.\n"
);
lpPerIoData->sAcceptSocket = 0;
LeaveCriticalSection(&cIocpSection);
return
false
;
}
else
{
for
(int nItem = 0; nItem < SHRT_MAX; nItem++)
{
if
(m_uSocketPool[nItem] == 0)
{
m_uSocketPool[nItem] = lpPerStContext->sClientSocket;
lpPerStContext->sClientSocket = INVALID_SOCKET;
LeaveCriticalSection(&cIocpSection);
//TRACE
(
"insert socketpool socket %d.\n"
,m_uSocketPool[nItem]);
return
true
;
}
}
}
LeaveCriticalSection(&cIocpSection);
return
false
;
}
bool CIocpServer::_DestroySocketPool()
{
linger stlinger;
stlinger.l_onoff = 1;
stlinger.l_linger = 0;
for
(int nItem = 0; nItem < SHRT_MAX; nItem++)
{
if
(m_uSocketPool[nItem] != 0)
{
setsockopt(m_uSocketPool[nItem],SOL_SOCKET,SO_LINGER,(char*)&stlinger,sizeof(linger));
closesocket(m_uSocketPool[nItem]);
}
}
return
true
;
}
HANDLE CIocpServer::_BindCreateIoCompletionPort(SOCKET hSocket, HANDLE hCompletionPort, ULONG_PTR lpStContext, DWORD dwNumber)
{
return
CreateIoCompletionPort((HANDLE)hSocket,hCompletionPort,lpStContext,dwNumber);
}
unsigned int CIocpServer::_GetProcessOfNumber()
{
SYSTEM_INFO SystemInfo;
GetSystemInfo(&SystemInfo);
return
(unsigned int)SystemInfo.dwNumberOfProcessors * 2;
}
bool CIocpServer::_InitIOCPServer(unsigned short uPort)
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2,2),&wsaData);
//
the create overlapped flag listen socket
m_sListenSocket = WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,0,WSA_FLAG_OVERLAPPED);
if
(m_sListenSocket == INVALID_SOCKET)
{
TRACE(
"m_sListenSocket create error at %d\n"
,WSAGetLastError());
return
false
;
}
hostent *lphostent;
char
hostname
[hot_size];
memset(
hostname
,0,sizeof(
hostname
));
if
(gethostname(
hostname
,hot_size) == SOCKET_ERROR)
{
TRACE(
"gethostname error at %d\n"
,WSAGetLastError());
return
false
;
}
//
get lphostent->h_addr_list ip data
if
((lphostent = gethostbyname(
hostname
)) == NULL)
{
TRACE(
"lphostent returned pointer error at %d\n"
,WSAGetLastError());
return
false
;
}
//
fill sockaddr struct data
struct sockaddr_in localaddr;
memset(&localaddr,0,sizeof(localaddr));
localaddr.sin_family = AF_INET;
localaddr.sin_port = htons(uPort);
localaddr.sin_addr.S_un.S_addr = (*(unsigned long*)lphostent->h_addr);
TRACE(
"listen server address ip:%s port:%d\n"
,inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
if
(bind(m_sListenSocket,(sockaddr*)&localaddr,sizeof(localaddr)) == SOCKET_ERROR)
{
TRACE(
"bind socket error at %d\n"
,WSAGetLastError());
return
false
;
}
if
(listen(m_sListenSocket,SOMAXCONN) == SOCKET_ERROR)
{
TRACE(
"listen socket error at %d\n"
,WSAGetLastError());
return
false
;
}
//
for
create _workerthread
for
(u_int i = 0; i < m_uThreads; i++)
{
HANDLE hThread = NULL;
hThread = (HANDLE)_beginthreadex(NULL,NULL,&_WorkerThread,(void*)this,NULL,NULL);
if
(hThread == INVALID_HANDLE_VALUE)
{
TRACE(
"thread number error this i = %d\n"
,i);
return
false
;
}
CloseHandle(hThread);
}
LP_PER_SOCKET_CONTEXT lpPerStContext = (LP_PER_SOCKET_CONTEXT) \
GlobalAlloc(GPTR, sizeof(PER_SOCKET_CONTEXT));
//
m_lpContext[0] point m_sListensocket per handle address
//
bind m_slistensocket handle to completion port
m_lpContext[0] = lpPerStContext;
_BindCreateIoCompletionPort(m_sListenSocket,m_hCompletionPort,(DWORD)lpPerStContext,0);
DWORD dwBytesIoctl;
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidDisConnectEx = WSAID_DISCONNECTEX;
//
returned acceptex
function
pointer
if
(WSAIoctl(m_sListenSocket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,sizeof(GUID),
&m_lpfnAcceptEx,sizeof(m_lpfnAcceptEx),
&dwBytesIoctl,NULL,NULL) == SOCKET_ERROR)
{
TRACE(
"return m_lpfnAcceptEx pointer error at %d\n"
,WSAGetLastError());
return
1;
}
//
returned disconnectex
function
pointer
if
(WSAIoctl(m_sListenSocket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidDisConnectEx,sizeof(GUID),
&m_lpfnDisconnectEx,
sizeof(m_lpfnDisconnectEx),
&dwBytesIoctl,NULL,NULL) == SOCKET_ERROR)
{
TRACE(
"return m_lpfnGetAcceptExAddrs error at %d\n"
,WSAGetLastError());
return
1;
}
//
需要把监听端口绑定到完成端口 然后投递多个AcceptEx
for
(u_int i = 0; i < m_uThreads; i++)
_PostAcceptEx(lpPerStContext);
return
true
;
}