首页
社区
课程
招聘
[分享]共享一个自己用的远控的IOCP 模型
发表于: 2014-7-31 13:43 7551

[分享]共享一个自己用的远控的IOCP 模型

2014-7-31 13:43
7551
第一次来看学发帖 ,希望各个大大能顶顶 ,本人技术实在很菜,所以平时都只是看看而已。

首先声明这个IOCP模型不是我写的 我只是提取出来而已,自己用的时候发现了一些问题没有处理异常掉线,所以自己又加了个KEEPALIVE进去,希望对大家有用。

IocpModeSvr.h:

// IocpModeSvr.h: interface for the CIocpModeSvr class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_IOCPMODESVR_H__46FFF420_23C3_4356_A88D_AEBDA61EA186__INCLUDED_)
#define AFX_IOCPMODESVR_H__46FFF420_23C3_4356_A88D_AEBDA61EA186__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include "ClientList.h"
#include <winsock2.h>
#include <MSTcpIP.h>
#pragma  comment(lib,"ws2_32.lib")

//服务端口
#define SVRPORT 80
//缓冲区大小
#define BUFFER_SIZE 1024 * 4//1024 * 200
//接收数据
#define RECV_POSTED 0
//发送数据
#define SEND_POSTED 1
//单句柄数据
typedef struct _PER_HANDLE_DATA
{
  unsigned long IpAddr;
  SOCKET sClient;
}PER_HANDLE_DATA,*PPER_HANDLE_DATA;
//IO操作数据
typedef struct _PER_IO_OPERATION_DATA
{
  //重叠结构
  OVERLAPPED OverLapped;
  //数据缓冲区
  WSABUF DataBuf;
  char Buf[BUFFER_SIZE];
  //操作类型表示发送、接收或关闭等
  bool OperType;
}PER_IO_OPERATION_DATA,*PPER_IO_OPERATION_DATA;

//回调处理数据函数原型
typedef void __stdcall ProcessRecvData(unsigned long sIP,
                     SOCKET sClient,
                     char * pData,
                     unsigned long DataLength,
                     void *pContext);

DWORD WINAPI ServerWorkerProc(LPVOID lParam);
DWORD WINAPI ListenProc(LPVOID lParam);

//#################完成端口socket###################
class CIocpModeSvr  
{
public:
  CIocpModeSvr();
  virtual ~CIocpModeSvr();
public:
  void *m_pContext;
  //初始化
  bool Init(ProcessRecvData* pProcessRecvData, void *pContext, unsigned long iSvrPort=SVRPORT);
  //反初始化
  void UnInit();
  /*  用于发送消息的函数组*/
public:

  bool SendMsg(SOCKET sClient,char * pData,unsigned long Length);
  bool SendMsgToAll(char * pData,unsigned long Length);

  bool DisConnectClient(SOCKET sClient);
  void DisConnectAll();

  void RemoveSocket(SOCKET sClient);
public:
  //获得本地Ip地址
  const char * GetLocalIpAdd(){ 
    if(IsStart)return HostIpAddr;
    else return NULL;
  }
  //获得服务器使用的端口号
  unsigned long GetSvrPort(){
    if(IsStart)return m_SvrPort;
    else return 0;
  }

protected:
  int InitNetWork(unsigned int SvrPort=SVRPORT,
                 char *pHostIpAddress=NULL);

  ProcessRecvData* m_pProcessRecvData;
private:
  //完成句柄
  HANDLE CompletionPort;
  //主机IP
  char  HostIpAddr[32];
  //客户信息列表
  DCLinkedList<PER_HANDLE_DATA> ClientList;  
  //客户信息临界保护量
  CRITICAL_SECTION cInfoSection;
  //服务是否已经启动
  bool IsStart;
  //侦听SOCKET
  SOCKET ListenSocket;
  //侦听线程句柄,用于反初始化时销毁侦听线程
  HANDLE ListenThreadHandle;
  //服务端口
  unsigned long m_SvrPort;

  friend DWORD WINAPI ServerWorkerProc(LPVOID lParam);
  friend DWORD WINAPI ListenProc(LPVOID lParam);
};

#endif // !defined(AFX_IOCPMODESVR_H__46FFF420_23C3_4356_A88D_AEBDA61EA186__INCLUDED_)

IocpModeSvr.cpp:

//////////////////////////////////////////////////////////////////////////
// IocpModeSvr.cpp: implementation of the CIocpModeSvr class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "IocpModeSvr.h"

#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif

#pragma  warning(disable:4800)

DWORD WINAPI ServerWorkerProc(LPVOID lParam)
{
  CIocpModeSvr* pSvr=(CIocpModeSvr*)lParam;
  HANDLE CompletionPort=pSvr->CompletionPort;
  DWORD ByteTransferred;
  PPER_HANDLE_DATA pPerHandleData;
  PPER_IO_OPERATION_DATA PerIoData;
  DWORD RecvByte;
  while(true)
  {
    bool bSuccess=GetQueuedCompletionStatus(CompletionPort,
                        &ByteTransferred,
                        (LPDWORD)&pPerHandleData,
                        (LPOVERLAPPED* )&PerIoData,
                        INFINITE);
    //退出信号到达,退出线程
    if(ByteTransferred==-1 && PerIoData==NULL)
    {
      return 1L;
    }
    //客户机已经断开连接或者连接出现错误
    if(!bSuccess||(ByteTransferred==0 && 
       (PerIoData->OperType==RECV_POSTED || PerIoData->OperType==SEND_POSTED)))
    {
      //将该客户端数据删除
      bool bFind=false;
      ::EnterCriticalSection(&pSvr->cInfoSection);
      PER_HANDLE_DATA PerHandleData;
      for (pSvr->ClientList.Reset();!pSvr->ClientList.EndOfList();pSvr->ClientList.Next())
      {//遍历
        memset(&PerHandleData,0,sizeof(PER_HANDLE_DATA));
        PerHandleData = pSvr->ClientList.Data();
        if((PerHandleData.IpAddr==pPerHandleData->IpAddr) &&
           (PerHandleData.sClient==pPerHandleData->sClient))
        {
          bFind=true;
          pSvr->ClientList.DeleteAt();
          break;
        }
      }
      ::LeaveCriticalSection(&pSvr->cInfoSection);
      if(bFind)
      {
        //Client退出
        in_addr in_A;
        in_A.S_un.S_addr=pPerHandleData->IpAddr;
        //XTRACE("\nSocket : %d Disconneted,%d in the ClientList now",pPerHandleData->sClient,pSvr->ClientList.ListSize());
        //调用回调函数,通知上层该客户端已经断开
        pSvr->m_pProcessRecvData(pPerHandleData->IpAddr,
          pPerHandleData->sClient,
          NULL,
          0,
          pSvr->m_pContext);
        //关闭套接口
        closesocket(pPerHandleData->sClient);
        GlobalFree(pPerHandleData);
        GlobalFree(PerIoData);
      }
      continue;
    }
    //为读操作完成,处理数据
    if(PerIoData->OperType==RECV_POSTED)
    {
      //调用回调函数,处理数据
      pSvr->m_pProcessRecvData(pPerHandleData->IpAddr,
                             pPerHandleData->sClient,
                   PerIoData->Buf,
                   ByteTransferred,
                   pSvr->m_pContext);
      //将源数据置空
      memset(PerIoData->Buf,0,BUFFER_SIZE);
      ByteTransferred=0;
      //重置IO操作数据
      unsigned long Flag=0;
      ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
      
      PerIoData->DataBuf.buf=PerIoData->Buf;
      PerIoData->DataBuf.len=BUFFER_SIZE;
      PerIoData->OperType=RECV_POSTED;
      //提交另一个Recv请求
      WSARecv(pPerHandleData->sClient,
        &(PerIoData->DataBuf),
        1,
        &RecvByte,
        &Flag,
        &(PerIoData->OverLapped),
        NULL);
    }
    //发送完成,置空缓冲区,释放缓冲区
    if(PerIoData->OperType==SEND_POSTED)
    {
      memset(PerIoData,0,sizeof(PER_IO_OPERATION_DATA));
      GlobalFree(PerIoData);
      ByteTransferred=0;
    }

  }
  return 0L;
}
DWORD WINAPI ListenProc(LPVOID lParam)
{
  CIocpModeSvr* pSvr=(CIocpModeSvr*)lParam;
  SOCKET Accept;

  while(true)
  {
    //接收客户的请求
    Accept=WSAAccept(pSvr->ListenSocket,NULL,NULL,NULL,0);
    //申请新的句柄操作数据 
    // set KeepAlive parameter  
    tcp_keepalive alive_in;  
    tcp_keepalive alive_out;  
    alive_in.keepalivetime    = 300;  // 0.5s  
    alive_in.keepaliveinterval  =5; //1s  
    alive_in.onoff = TRUE;  
    unsigned long ulBytesReturn = 0;  
    int nRet = WSAIoctl(Accept, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in),  
      &alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL);  
    if (nRet == SOCKET_ERROR)  
    {  
      TRACE(L"WSAIoctl failed: %d/n", WSAGetLastError());  
      return FALSE;  
    }  

    PPER_HANDLE_DATA pPerHandleData=(PPER_HANDLE_DATA) \
                                  GlobalAlloc(GPTR,
                    sizeof(PER_HANDLE_DATA)
                    );
    //取得客户端信息
    sockaddr soad;
    sockaddr_in in;
    int len=sizeof(soad);
    if(getpeername(Accept,&soad,&len)==SOCKET_ERROR)
    {
    }
    else{
      memcpy(&in,&soad,sizeof(sockaddr));
    }
    //句柄数据
    pPerHandleData->sClient=Accept;
    pPerHandleData->IpAddr=in.sin_addr.S_un.S_addr;

    //存储客户信息
    ::EnterCriticalSection(&pSvr->cInfoSection);
    pSvr->ClientList.InsertAfter(*pPerHandleData);
    ::LeaveCriticalSection(&pSvr->cInfoSection);

//    XTRACE("\nUserIP: %s ,Socket : %d Connected!%d in the ClientList now",inet_ntoa(in.sin_addr),Accept,pSvr->ClientList.ListSize());

    //关联客户端口到完成端口,句柄数据在此时被绑定到完成端口
    CreateIoCompletionPort((HANDLE)Accept,
                         pSvr->CompletionPort,
                 (DWORD)pPerHandleData,
                 0);
    //Io操作数据标志

    PPER_IO_OPERATION_DATA PerIoData=(PPER_IO_OPERATION_DATA) \
                                    GlobalAlloc(GPTR,
                      sizeof(PER_IO_OPERATION_DATA));
    unsigned long  Flag=0;
    DWORD RecvByte;
    ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));

    PerIoData->DataBuf.buf=PerIoData->Buf;
    PerIoData->DataBuf.len=BUFFER_SIZE;
    PerIoData->OperType=RECV_POSTED;
    //提交首个接收数据请求
    //这时
    //如果客户端断开连接
    //则也可以以接收数据时得到通知  
    WSARecv(pPerHandleData->sClient,
      &(PerIoData->DataBuf),
      1,
      &RecvByte,
      &Flag,
      &(PerIoData->OverLapped),
      NULL);
  }
}
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CIocpModeSvr::CIocpModeSvr()
{
  IsStart=false;
  m_pContext = NULL;
  ClientList.ClearList();
}
CIocpModeSvr::~CIocpModeSvr()
{

}

//提交发送消息请求,
//如果提交发送消息失败,
//则将导致在工作线程里将目标客户端的连接切断
bool CIocpModeSvr::SendMsg(SOCKET sClient,char * pData,unsigned long Length)
{
  if(sClient==INVALID_SOCKET || pData==NULL || Length==0 || !IsStart)return false;

  //申请操作键
  PPER_IO_OPERATION_DATA PerIoData=(PPER_IO_OPERATION_DATA) \
                    GlobalAlloc(GPTR,
                                  sizeof(PER_IO_OPERATION_DATA));

  //准备缓冲
  unsigned long  Flag=0;
  DWORD SendByte;
  ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
  memcpy(PerIoData->Buf,pData,Length);
  PerIoData->DataBuf.buf=PerIoData->Buf;
  PerIoData->DataBuf.len=Length;
  PerIoData->OperType=SEND_POSTED;
  int bRet=WSASend(sClient,
                 &(PerIoData->DataBuf),
                 1,
                 &SendByte,
                 Flag,
                 &(PerIoData->OverLapped),
                 NULL);
  if(bRet==SOCKET_ERROR && GetLastError()!=WSA_IO_PENDING)
  {
    return false;
  }
  else return true;
  
  return false;
}
bool CIocpModeSvr::SendMsgToAll(char * pData,unsigned long Length)
{
  if(pData==NULL || Length==0 || !IsStart)return false;

  ::EnterCriticalSection(&cInfoSection);
  PER_HANDLE_DATA PerHandleData;
  for (ClientList.Reset();!ClientList.EndOfList();ClientList.Next())
  {//遍历
    memset(&PerHandleData,0,sizeof(PER_HANDLE_DATA));
    PerHandleData=ClientList.Data();
    SendMsg(PerHandleData.sClient,pData,Length);
  }
  ::LeaveCriticalSection(&cInfoSection);
  return true;
}

bool CIocpModeSvr::Init(ProcessRecvData* pProcessRecvData, void *pContext,
            unsigned long iSvrPort)
{
  if(IsStart || pProcessRecvData==NULL)return false;

  m_SvrPort=iSvrPort;

  ::InitializeCriticalSection(&cInfoSection);

  m_pProcessRecvData=pProcessRecvData;
  int  bRet=InitNetWork(iSvrPort,HostIpAddr);
  if(bRet==0)
  {
    m_pContext = pContext;
    IsStart=true;
    return true;
  }
  else
    return false;
}

void CIocpModeSvr::RemoveSocket(SOCKET sClient)
{
  ::EnterCriticalSection(&cInfoSection);

  PER_HANDLE_DATA PerHandleData;
  for (ClientList.Reset();!ClientList.EndOfList();ClientList.Next())
  {//遍历
    memset(&PerHandleData,0,sizeof(PER_HANDLE_DATA));
    PerHandleData=ClientList.Data();
    if(PerHandleData.sClient==sClient)
    {
      ClientList.DeleteAt();
      break;
    }
  }

  ::LeaveCriticalSection(&cInfoSection);
}

void CIocpModeSvr::DisConnectAll()
{
  if(!IsStart)return ;
  
  ::EnterCriticalSection(&cInfoSection);

  PER_HANDLE_DATA PerHandleData;
  for (ClientList.Reset();!ClientList.EndOfList();ClientList.Next())
  {
    memset(&PerHandleData,0,sizeof(PER_HANDLE_DATA));
    PerHandleData=ClientList.Data();
    shutdown(PerHandleData.sClient,1);
    closesocket(PerHandleData.sClient);
    ClientList.DeleteAt();
  }

  ::LeaveCriticalSection(&cInfoSection);
}

bool CIocpModeSvr::DisConnectClient(SOCKET sClient)
{
  if(!IsStart || sClient==INVALID_SOCKET)return false;
  
  ::EnterCriticalSection(&cInfoSection);

  PER_HANDLE_DATA PerHandleData;
  for (ClientList.Reset();!ClientList.EndOfList();ClientList.Next())
  {
    memset(&PerHandleData,0,sizeof(PER_HANDLE_DATA));
    PerHandleData=ClientList.Data();
    if(PerHandleData.sClient==sClient)
    {
      shutdown(PerHandleData.sClient,1);
      closesocket(PerHandleData.sClient);
      ClientList.DeleteAt();
      ::LeaveCriticalSection(&cInfoSection);
      return true;
    }
  }
  ::LeaveCriticalSection(&cInfoSection);
  return false;
}
void CIocpModeSvr::UnInit()
{
  if(!IsStart)return;
  //退出工作线程
  SYSTEM_INFO sys_Info;
  GetSystemInfo(&sys_Info);
  for(DWORD i=0;i<sys_Info.dwNumberOfProcessors*2+2;i++)
  {
    //寄出退出消息
    PostQueuedCompletionStatus(CompletionPort,
                   -1,
                     -1,
                     NULL);
  }
  //退出侦听线程
  ::TerminateThread(ListenThreadHandle,1L);
  ::WaitForSingleObject(ListenThreadHandle,10);
  CloseHandle(ListenThreadHandle);
  //关闭网络的侦听
  shutdown(ListenSocket,0);
  closesocket(ListenSocket);
  //切断当前所有连接
  DisConnectAll();  
  ::DeleteCriticalSection(&cInfoSection);
  m_pProcessRecvData=NULL;
  IsStart=false;
}
int CIocpModeSvr::InitNetWork(unsigned int SvrPort,
                char *pHostIpAddress)
{
  //启动网络
  int Error=0;
  WSADATA wsaData;
  char Name[100];
  hostent *pHostEntry;
  in_addr rAddr;
  //Net Start Up
  Error=WSAStartup(MAKEWORD(0x02,0x02),&wsaData);
  if(Error!=0)
  {
    Error = WSAGetLastError();
    strcpy(pHostIpAddress,"");

//    XTRACE("\nWSAStartUp Faild With Error: %d",Error);
    
    return Error;
  }
  //Make Version
  if ( LOBYTE( wsaData.wVersion ) != 2 ||
    HIBYTE( wsaData.wVersion ) != 2 )
  {
    WSACleanup();  

    //XTRACE("The Local Net Version Is not 2");

    return -1;
  }
  //Get Host Ip
  Error = gethostname ( Name, sizeof(Name) );
  if( 0 == Error )
  {
    pHostEntry = gethostbyname( Name );
    if( pHostEntry != NULL )
    {
      memcpy( &rAddr, pHostEntry->h_addr_list[0], sizeof(struct in_addr) );
      strcpy(pHostIpAddress,inet_ntoa(rAddr));
    }
    else
    {
      Error = WSAGetLastError();
      
      //XTRACE("\nGetHostIp faild with Error: %d",Error);

      return Error;
      
    }
  }
  else
  {
    Error = WSAGetLastError();

    //XTRACE("\ngethostname faild with Error: %d",Error);

    return Error;
  }
  if(0==Error)
  {
    //创建侦听端口
    ListenSocket=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
    if(ListenSocket==INVALID_SOCKET)
    {
      Error = WSAGetLastError();
    
      //XTRACE("\nCreateSocket faild with Error: %d",Error);

      return Error;
    }
  }
  //绑定到目标地址
  if(0==Error)
  {
    sockaddr_in InternetAddr;
    InternetAddr.sin_family=AF_INET;
    InternetAddr.sin_addr.S_un.S_addr=htonl(INADDR_ANY);
    InternetAddr.sin_port=htons(SvrPort);
    if(bind(ListenSocket,
          (PSOCKADDR )&InternetAddr,
        sizeof(InternetAddr))==SOCKET_ERROR)
    {
      Error=GetLastError();

      //XTRACE("\nbind Socket faild with Error: %d",Error);

      return Error;
    }
  }
  //侦听端口上的连接请求
  if(0==Error)
  {
    if(  listen(ListenSocket,5)==SOCKET_ERROR)
    {
      Error=GetLastError();

      //XTRACE("\nlisten Socket faild with Error: %d",Error);
  
      return Error;
    }
  }
  //创建完成端口句柄
  if(0==Error)
  {
    CompletionPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
    if(CompletionPort==INVALID_HANDLE_VALUE)
    {
      Error=GetLastError();

      //XTRACE("\nCreateIoCompletionPort faild with Error: %d",Error);

      return Error;
    }
  }
  //启动工作线程,线程数为CPU处理器数量*2+2
  if(0==Error)
  {  
    SYSTEM_INFO sys_Info;
    GetSystemInfo(&sys_Info);
    for(DWORD i=0;i<sys_Info.dwNumberOfProcessors*2+2;i++)
    {
      HANDLE ThreadHandle;
      DWORD ThreadID;
      
      ThreadHandle=CreateThread(NULL,
        0,
        ServerWorkerProc,
        this,
        0,
        &ThreadID);
      if(ThreadHandle==NULL)
      {
        Error = WSAGetLastError();

        //XTRACE("\nCreate Server Work Thread faild with Error: %d",Error);

        return Error;
      }  
      CloseHandle(ThreadHandle);
    }
  }
  //启动侦听线程
  if(0==Error)
  {
    DWORD thID;
    ListenThreadHandle=CreateThread(NULL,
                                  0,
                    ListenProc,
                    this,
                    0,
                    &thID);
    if(ListenThreadHandle==NULL)
    {
      Error = WSAGetLastError();

      //XTRACE("\nCreate Listen Thread faild with Error: %d",Error);
  
      return Error;    
    }
  }
  return Error;
}

喜欢的人可以拿去看看,希望对大家有点帮助,我以后还会和大家分享更多的东西。

[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课

上传的附件:
收藏
免费 0
支持
分享
最新回复 (11)
雪    币: 688
活跃值: (90)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
2
谢谢啦,不过你这个IOCP一般啦~
2014-7-31 13:48
0
雪    币: 70
活跃值: (108)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
3
说实话我也是初学者,前段时间才把IOCP看会,因为一个项目的原因所以就用了这个东西,希望能抛砖引玉,鼓励更多的朋友分享好的东东,谢谢大家的支持。
2014-7-31 13:54
0
雪    币: 183
活跃值: (1223)
能力值: ( LV3,RANK:30 )
在线值:
发帖
回帖
粉丝
4
为什么不用boost啊
2014-7-31 15:53
0
雪    币: 11
活跃值: (40)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
5
支持一下,很棒很有价值的代码
2014-7-31 20:41
0
雪    币: 70
活跃值: (108)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
6
平时没用过那个东西 这个IOCP 虽然效率不够高但是我觉得写得挺清晰的 还不错
2014-7-31 20:55
0
雪    币: 70
活跃值: (108)
能力值: ( LV3,RANK:20 )
在线值:
发帖
回帖
粉丝
7
谢谢 支持
2014-7-31 20:56
0
雪    币: 1392
活跃值: (5202)
能力值: ( LV13,RANK:240 )
在线值:
发帖
回帖
粉丝
8
难道IOCP还有不同的用法,还是指的整个架构不合理?求解。
2014-8-5 15:11
0
雪    币: 688
活跃值: (90)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
9
你这个只能处理一般的IO,应该将业务和通信分开
2014-8-7 20:27
0
雪    币: 261
活跃值: (51)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
10
请问有好的例子吗?想学习一下
2014-8-7 20:58
0
雪    币: 688
活跃值: (90)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
11
搜索一下codeproject的iocp就可以看到了,外国人编写的
2014-8-7 22:10
0
雪    币: 261
活跃值: (51)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
12
非常感谢
2014-8-8 06:59
0
游客
登录 | 注册 方可回帖
返回
//