首页
社区
课程
招聘
[分享][原创]Windows模型——消息异步;
2021-6-21 20:08 4752

[分享][原创]Windows模型——消息异步;

2021-6-21 20:08
4752

客户端为C++语言,服务端为Pascal语言——网络模型之消息异步;(帖源码)

 1,消息异步的话,顾名思义,是基于消息传递信号,而响应相相对应事件的网络模型。因为我只需要客户端仅仅听从于服务端一个人的命令,所以C++的模型为阻塞模式;

       2,两端采用类封装,简化代码流程;

       3,代码封装成流,以便于接收命令码、错误码和数据;以保证数据准确;


客户端(C++)


头文件:CSocket.h 

//需要包含Winsock库


//#include <WinSock2.h>

//#pragma comment(lib,"ws2_32.lib")

//利用类简化流程,封装成流,以便于接收命令码和错误码还有数据 


class CSocket
{
public:
CSocket();
CSocket(SOCKET s);
~CSocket(void);

public:
int Create();
int Close();
int Connect(char* host, WORD port);
int SoSendStream(DWORD  ucmd, DWORD  uerr, CStream* buf);
int SoRecvStream(DWORD& ucmd, DWORD& uerr, CStream* buf);

private:
 
int SoSendBytes( char* buf, int len);
int SoRecvBytes( char* buf, int len);

int SoRecv(char* buf, int len, int& cb);
int SoSend(char* buf, int len, int& cb);

private:
SOCKET m_s;
};


CPP源码:

#include "StdAfx.h"
#include "CSocket.h"

CSocket::CSocket()
{
m_s = INVALID_SOCKET;
}
CSocket::~CSocket()
{
if (int e = Close())
{
printf("%d\r\n",GetLastError());
}

}

int CSocket::Create()
{
m_s = socket(AF_INET, SOCK_STREAM, 0);

if (INVALID_SOCKET == m_s)
{
return WSAGetLastError();
}

return 0;
}

int CSocket::Close()
{

if (INVALID_SOCKET != m_s)
{
if (closesocket(m_s))
{
printf("%d\r\n",GetLastError());  return WSAGetLastError();
}

m_s = INVALID_SOCKET;

}
return 0;

}
int CSocket::Connect(char* host, WORD port)
{
SOCKADDR_IN addrSrv;

addrSrv.sin_addr.S_un.S_addr = inet_addr(host);
addrSrv.sin_family = AF_INET;
addrSrv.sin_port = htons(port);

if (connect(m_s, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR)))
{
return WSAGetLastError();
}

DWORD timeout = 300 * 1000;

if (setsockopt(m_s,SOL_SOCKET,SO_RCVTIMEO,(char*)&timeout,sizeof(timeout)))
{
printf("%d\r\n",GetLastError());
}

if (setsockopt(m_s,SOL_SOCKET,SO_SNDTIMEO,(char*)&timeout,sizeof(timeout)))
{
printf("%d\r\n",GetLastError());
}

return 0;

}

int CSocket::SoRecv(char* buf, int len, int& cb)
{
cb = recv(m_s, buf, len, 0);

if (cb > 0)
{
return 0;
}

if (cb < 0)
{
printf("%d\r\n",GetLastError());//WSAGetLastError();
}
return WSAECONNRESET;
}

int CSocket::SoSend(char* buf, int len, int& cb)
{
cb = send(m_s, buf, len, 0);

if (cb > 0)
{
return 0;
}
if (cb < 0)
{
printf("%d\r\n",GetLastError()); return WSAGetLastError();
}

return WSAECONNRESET;
}

int CSocket::SoSendBytes(char * buf, int len)
{

int off = 0;

while (off < len)
{
int cb = 0;

if (int e = SoSend((char*)buf + off, len - off, cb))
{
return e;
}

off += cb;

}

return 0;

}
int CSocket::SoRecvBytes(char * buf, int len)
{
int off = 0;

while (off < len)
{
int cb = 0;
if (int e = SoRecv((char*)buf + off, len - off, cb))
{
return e;
}

off += cb;

}

return 0;

}

int CSocket::SoSendStream(DWORD ucmd, DWORD uerr, CStream * buf)
{
if (int e = SoSendBytes((char*)&buf->m_size, sizeof(buf->m_size)))
{
return e;
}

if (int e = SoSendBytes((char*)&ucmd, sizeof(ucmd)))
{
return e;
}
if (int e = SoSendBytes((char*)&uerr, sizeof(uerr)))
{
return e;
}
if (int e = SoSendBytes((char*)buf->m_p, buf->m_size))
{
return e;
}

return 0;

}
 
int CSocket::SoRecvStream(DWORD& ucmd, DWORD& uerr, CStream* buf)
{
int size;
if (int e = SoRecvBytes((char*)&size, sizeof(size)))
{
return e;
}
 
DBG_ASSERT((size >= 0) && (size < 1024 * 1024), 0);
DBG_ASSERT((size >= 0) && (size < 1024 * 1024), 0);
DBG_ASSERT((size >= 0) && (size < 1024 * 1024), 0);

if (int e = SoRecvBytes((char*)&ucmd, sizeof(ucmd)))
{
return e;
}

if (int e = SoRecvBytes((char*)&uerr, sizeof(uerr)))
{
return e;
}
if (int e = buf->SetSize(size))
{
printf("%d\r\n",GetLastError()); // return e;
}

if (int e = SoRecvBytes((char*)buf->m_p, buf->m_size))
{
return e;
}

return 0;
}



服务端(Pascal):

unit UnitXSocket;

interface

uses Winapi.Windows, Winapi.Messages, WinSock, System.Types, System.SysUtils,
  System.Variants, System.SyncObjs, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, UnitXStream;

const
  WM_SOCKET = (WM_USER + 1);
  WM_SOCKET_ACCEPT = (WM_USER + 2);
  WM_SOCKET_RECV = (WM_USER + 3);
  WM_SOCKET_CLOSE = (WM_USER + 4);

type

  TXSock = class
  public
    constructor Create;
    destructor Destroy; override;

  public
    procedure OnAccept(err: integer); virtual;
    procedure OnRecv(err: integer); virtual;
    procedure OnSend(err: integer); virtual;
    procedure OnClose(err: integer); virtual;

  public
    procedure Close();

  public
    procedure SelectEvent();

  public
    Fs: TSocket;

  public
    FListR: TList;
    FListS: TList;

    FLock: TCriticalSection;

  private
    FEnableNotify: BOOLEAN;

  protected
    FRef: integer;

  end;

  TXSockWnd = class
  public
    constructor Create;
    destructor Destroy; override;
  public
    function Run(): integer;
    procedure SetNotifyWnd(wnd: Hwnd);
  private
    class function WndProc(wnd: Hwnd; uMsg: UINT; wPrm: wParam; lPrm: LPARAM)
      : LRESULT; stdcall; static;

  private
    procedure OnSocketEvent(s: TSocket; fd: dword; err: dword);
 

  public
    Fwnd: Hwnd;
    FwndNotify: Hwnd;
    FSockList: TList;

  end;

  TXSocketSrvr = class(TXSock)//基类
  public
    function SrvrCreate(ip: string; port: WORD): integer;
    function SrvrClose(): integer;

  public
    procedure OnAccept(err: integer); override;

  end;

  TXSocket = class(TXSock)//继承基类
  public
    constructor Create;
    destructor Destroy; override;

  public
    procedure Ref();
    procedure Unref();
    procedure SetPrbWnd(wnd: Hwnd; msg: UINT);
 

  public

    function SoSendStream(ucmd: integer; uerr: integer; buf: TXStream): integer;
    function SoRecvStream(var ucmd: integer; var uerr: integer;
      buf: TXStream): integer;

    function SoPeekStream(var ucmd: integer; var uerr: integer;
      buf: TXStream): integer;

  private
    function SoSend(buf: Pbyte; len: integer; var cb: integer): integer;
    function SoSendBytes(buf: Pbyte; len: integer): integer;
    function SoRecv(buf: Pbyte; len: integer; var cb: integer): integer;
    function SoRecvBytes(buf: Pbyte; len: integer): integer;

  public
    procedure OnRecv(err: integer); override;
    procedure OnSend(err: integer); override;
    procedure OnClose(err: integer); override;
 

  public

    Ffrm: Pointer; 
    FLastRecvTime: dword;

    Fmem: TXStream;

    Fwnd: Hwnd;
    Fmsg: UINT;

  end;

var
  SockWnd: TXSockWnd;

implementation

constructor TXSock.Create;
begin

  inherited;

  FListR := TList.Create;
  FListS := TList.Create;

  FLock := TCriticalSection.Create;

  FEnableNotify := TRUE;
  FRef := 0;

end;

destructor TXSock.Destroy;
begin

  FListR.Free;
  FListS.Free;

  FLock.Free;

  inherited;

end;

procedure TXSock.OnAccept(err: integer);
begin
end;

procedure TXSock.OnRecv(err: integer);
begin
end;

procedure TXSock.OnSend(err: integer);
begin
end;

procedure TXSock.OnClose(err: integer);
begin
end;

procedure TXSock.SelectEvent();
var
  fd: dword;

begin

  fd := FD_CLOSE;

  if FListS.Count > 0 then
    fd := fd or FD_WRITE;
  if FListR.Count < 4 then
    fd := fd or FD_READ;

  if Fs = INVALID_SOCKET then
    fd := 0;

  if 0 <> WSAAsyncSelect(Fs, SockWnd.Fwnd, WM_SOCKET, fd) then
  begin
  end;

end;

procedure TXSock.Close();
var
  i: integer;

begin

  if Fs = INVALID_SOCKET then
    Assert(False);

  if 0 <> WSAAsyncSelect(Fs, SockWnd.Fwnd, WM_SOCKET, 0) then
  begin
    Assert(False);
  end;

  SockWnd.FSockList.Remove(self);

  if 0 <> closesocket(Fs) then
    Assert(False);

  Fs := INVALID_SOCKET;

  for i := 0 to FListR.Count - 1 do
  begin
    TMemoryStream(FListR.Items[i]).Free;
  end;

  FListR.Clear;

  for i := 0 to FListS.Count - 1 do
  begin
    TMemoryStream(FListS.Items[i]).Free;
  end;

  FListS.Clear;

  if FRef >= 0 then // IS SERVER
  begin
    SendMessage(SockWnd.FwndNotify, WM_SOCKET_CLOSE, wParam(0), LPARAM(self));
  end;

  exit;

end;

constructor TXSockWnd.Create;
var
  wsad: WSADATA;
begin
  Fwnd := 0;

  FSockList := TList.Create;

  if 0 <> WSAStartup(MAkEWORD(1, 1), wsad) then
    Assert(False);

end;

destructor TXSockWnd.Destroy;
begin
  if 0 <> WSACleanup() then
    Assert(False);

  FSockList.Free;

end;

procedure TXSockWnd.SetNotifyWnd(wnd: Hwnd);
begin
  FwndNotify := wnd;
end;

function TXSockWnd.Run(): integer;
var
  wc: WNDCLASSA;
  msg: TMsg;
begin
  ZeroMemory(@wc, sizeof(wc));

  wc.lpfnWndProc := @WndProc;
  wc.lpszClassName := 'XSW';

  if 0 = RegisterClassA(wc) then
  begin
    RTL_GetErrMsg(GetLastError());
    Assert(False);

  end;
  Fwnd := CreateWindow('XSW', 'XSW', 0, 0, 0, 0, 0, 0, 0, 0, nil);

  if Fwnd = 0 then
    Assert(False);

  while GetMessage(msg, 0, 0, 0) do
  begin
    TranslateMessage(msg);
    DispatchMessage(msg);
  end;

  result := msg.wParam;

  exit;

end;

class function TXSockWnd.WndProc(wnd: Hwnd; uMsg: UINT; wPrm: wParam;
  lPrm: LPARAM): LRESULT;
begin

  case uMsg of
    WM_DESTROY:
      PostQuitMessage(0);
    WM_SOCKET:
      SockWnd.OnSocketEvent(TSocket(wPrm), WSAGETSELECTEVENT(lPrm),
        WSAGETSELECTERROR(lPrm));

  end;

  result := DefWindowProc(wnd, uMsg, wPrm, lPrm);

  exit;

end;

procedure TXSockWnd.OnSocketEvent(s: TSocket; fd: dword; err: dword);
var
  i: integer;
begin
  for i := 0 to FSockList.Count - 1 do
  begin
    if TXSock(FSockList.Items[i]).Fs = s then
    begin
      case fd of
        FD_ACCEPT:
          TXSock(FSockList.Items[i]).OnAccept(err);
        FD_READ:
          TXSock(FSockList.Items[i]).OnRecv(err);
        FD_WRITE:
          TXSock(FSockList.Items[i]).OnSend(err);
        FD_CLOSE:
          TXSock(FSockList.Items[i]).OnClose(err);
      else
        Assert(False);

      end;

      exit;

    end;
  end;

end;


function TXSocketSrvr.SrvrCreate(ip: string; port: WORD): integer;
var
  sai: TSockAddrIn;

begin
  FRef := -1;

  Fs := socket(AF_INET, SOCK_STREAM, 0);

  if Fs = INVALID_SOCKET then
    Assert(False);

  sai.sin_family := AF_INET;
  sai.sin_addr.S_addr := inet_addr(PAnsiChar(AnsiString(ip)));
  sai.sin_port := htons(port);

  if 0 <> bind(Fs, sai, sizeof(sai)) then
  begin
    result := WSAGetLastError();

    if 0 <> closesocket(Fs) then
      Assert(False);

    Fs := INVALID_SOCKET;

    exit;
  end;

  if 0 <> listen(Fs, 5) then
    Assert(False);

  SockWnd.FSockList.Add(self);

  if 0 <> WSAAsyncSelect(Fs, SockWnd.Fwnd, WM_SOCKET, FD_ACCEPT) then
  begin
    Assert(False);
  end;

  result := 0;

  exit;

end;

function TXSocketSrvr.SrvrClose(): integer;
begin

  Close();

  result := 0;

  exit;

end;

procedure TXSocketSrvr.OnAccept(err: integer);
var
  sa: TSocket;
  sai_peer: TSockAddrIn;
  sai_peer_len: integer;

  so: TXSocket;

begin
  if 0 <> err then
  begin
    exit;
  end;

  sai_peer_len := sizeof(sai_peer);

  sa := accept(Fs, @sai_peer, @sai_peer_len);

  if sa = INVALID_SOCKET then
  begin
    if WSAEWOULDBLOCK <> WSAGetLastError() then
      Assert(False);

    exit;

  end;

  so := TXSocket.Create;
  so.Fs := sa;

  SockWnd.FSockList.Add(so);

  {
    if 0 <> WSAAsyncSelect(sa, SockWnd.Fwnd, WM_SOCKET, FD_READ or FD_CLOSE) then
    begin
    Assert(False);
    end;
  }
  if SockWnd.FwndNotify = 0 then
    Assert(False);

  SendMessage(SockWnd.FwndNotify, WM_SOCKET_ACCEPT, wParam(0), LPARAM(so));

end;

procedure TXSocket.OnRecv(err: integer);
var

  cb, size: integer;

begin
  if 0 <> err then
  begin
    Close();

    exit;

  end;

  if Fmem = nil then
  begin
    Fmem := TXStream.Create;
    Fmem.SetSize(12);
  end;

  if Fmem.Position < 12 then
  begin

    cb := 12 - Fmem.Position;

    if Fwnd <> 0 then
      SendMessage(Fwnd, Fmsg, Fmem.Position, 12);

  end
  else
  begin
    size := LPDWORD(Fmem.Memory)^;

    cb := 12 + size;

    Fmem.SetSize(cb);

    cb := cb - Fmem.Position;

  end;

  if cb <= 0 then
    Assert(False);

  cb := recv(Fs, (Pbyte(Fmem.Memory) + Fmem.Position)^, cb, 0);

  if cb = 0 then
  begin
    Close();
  end
  else if cb < 0 then
  begin
    if WSAEWOULDBLOCK <> WSAGetLastError() then
    begin
      Close();
    end;

  end
  else
  begin

    Fmem.Position := Fmem.Position + cb;

    size := LPDWORD(Fmem.Memory)^;

    if Fwnd <> 0 then
      SendMessage(Fwnd, Fmsg, Fmem.Position, 12 + size);

    if Fmem.Position = 12 + size then
    begin

      Fmem := nil;

    end;


  end;

  SelectEvent();

  if FEnableNotify then
  begin
    SendMessage(SockWnd.FwndNotify, WM_SOCKET_RECV, wParam(0), LPARAM(self));
  end;

  exit;

end;

procedure TXSocket.OnSend(err: integer);
var
  mem: TMemoryStream;

  cb: integer;

begin
  if 0 <> err then
  begin
    Close();

    exit;

  end;

  if FListS.Count = 0 then
  begin
    SelectEvent();

    exit;

  end;

  mem := TMemoryStream(FListS.Items[0]);

  Assert(mem.Position < mem.size);
  Assert(mem.Position < mem.size);
  Assert(mem.Position < mem.size);

  if (Fwnd <> 0) and (mem.Position = 0) then
    SendMessage(Fwnd, Fmsg, 0, mem.size);

  cb := send(Fs, (Pbyte(mem.Memory) + mem.Position)^,
    mem.size - mem.Position, 0);

  if cb = 0 then
  begin
    Close();
  end
  else if cb < 0 then
  begin
    if WSAEWOULDBLOCK <> WSAGetLastError() then
    begin
      Close();
    end;

  end
  else
  begin
    if Fwnd <> 0 then
      SendMessage(Fwnd, Fmsg, mem.Position + cb, mem.size);

    if mem.Position + cb = mem.size then
    begin
      FLock.Enter;
      FListS.Remove(mem);
      FLock.Leave;

      mem.Free;

    end
    else
    begin
      mem.Seek(mem.Position + cb, soBeginning);

    end;

  end;

  SelectEvent();

end;

procedure TXSocket.OnClose(err: integer);
begin
  if err <> 0 then
  begin
  end;

  Close();

end;


function TXSocket.SoPeekStream(var ucmd: integer; var uerr: integer;
  buf: TXStream): integer;

begin
  result := WSAEWOULDBLOCK;

  if not FEnableNotify then
    exit;

  if FListR.Count = 0 then
    exit;

  result := SoRecvStream(ucmd, uerr, buf);

  FEnableNotify := False;

  exit;

end;

function TXSocket.SoRecv(buf: Pbyte; len: integer; var cb: integer): integer;
var
  mem: TMemoryStream;

begin
  while TRUE do
  begin
    if Fs = INVALID_SOCKET then
    begin
      result := WSAECONNRESET;
      exit;
    end;

    if FListR.Count <> 0 then
      break;

    Sleep(1);

  end;

  mem := TMemoryStream(FListR.Items[0]);

  cb := mem.size - mem.Position;

  if cb > len then
    cb := len;

  CopyMemory(buf, Pbyte(mem.Memory) + mem.Position, cb);

  if mem.Position + cb = mem.size then
  begin
    FLock.Enter;
    FListR.Remove(mem);
    FLock.Leave;

    mem.Free;
  end
  else
  begin
    mem.Seek(mem.Position + cb, soBeginning);
  end;

  SelectEvent();

  result := 0;

  exit;

end;

function TXSocket.SoRecvBytes(buf: Pbyte; len: integer): integer;
var
  cb, off: integer;

begin
  off := 0;
  while off < len do
  begin

    result := SoRecv(buf + off, len - off, cb);
    if result <> 0 then
      exit;

    off := off + cb;
  end;
  result := 0;
end;

function TXSocket.SoRecvStream(var ucmd: integer; var uerr: integer;
  buf: TXStream): integer;
var
  mem: TXStream;
  ulen: dword;
begin

  while FListR.Count = 0 do
  begin
    if Fs = INVALID_SOCKET then
    begin
      result := WSAECONNRESET;
      exit;
    end;
    Sleep(1);
  end;

  FLock.Enter;

  mem := TXStream(FListR.Items[0]);
  mem.Seek(0, soBeginning);

  mem.ReadData(ulen);
  mem.ReadData(ucmd);
  mem.ReadData(uerr);

  buf.SetSize(ulen);

  CopyMemory(buf.Memory, Pbyte(mem.Memory) + 12, ulen);

  buf.Seek(0, soBeginning);

  FListR.Remove(mem);

  FLock.Leave;

  mem.Free;

  SelectEvent();

  result := 0;

  exit;

  {
    result := SoRecvBytes(@size, sizeof(size));
    if result <> 0 then
    exit;
    if (size < 0) or (size >= 1024 * 1024) then
    begin
    Sleep(0);
    end;

    Assert((size >= 0) and (size <= 1024 * 1024 * 10));
    Assert((size >= 0) and (size <= 1024 * 1024 * 10));
    Assert((size >= 0) and (size <= 1024 * 1024 * 10));

    result := SoRecvBytes(@ucmd, sizeof(ucmd));
    if result <> 0 then
    exit;

    result := SoRecvBytes(@uerr, sizeof(uerr));
    if result <> 0 then
    exit;

    buf.SetSize(size);

    result := SoRecvBytes(Pbyte(buf.Memory), size);
    if result <> 0 then
    exit;

    buf.Seek(0, soBeginning);
  }

end;

constructor TXSocket.Create;
begin

  inherited;

  Ffrm := nil;
  FKeepalive := 10 * 1000;
  FKeepaliveTimeout := 15 * 1000;

  FLastRecvTime := GetTickCount();

  Fmem := nil;
  Fwnd := 0;
  Fmsg := 0;

end;

destructor TXSocket.Destroy;
begin

  inherited;

end;

procedure TXSocket.Ref();
begin
  if FRef < 0 then
    Assert(False);

  FRef := FRef + 1;

end;

procedure TXSocket.Unref();
begin
  if FRef < 1 then
    Assert(False);

  FRef := FRef - 1;

  if FRef = 0 then
  begin
    self.Free;
  end;

end;

procedure TXSocket.SetPrbWnd(wnd: Hwnd; msg: UINT);
begin
  Fwnd := wnd;
  Fmsg := msg;

end;


function TXSocket.SoSend(buf: Pbyte; len: integer; var cb: integer): integer;
var
  mem: TMemoryStream;
begin
  while TRUE do
  begin
    if Fs = INVALID_SOCKET then
    begin
      result := WSAECONNRESET;
      exit;
    end;

    if FListR.Count < 4 then
      break;

    Sleep(1);

  end;

  cb := 4096;

  if cb > len then
    cb := len;

  mem := TMemoryStream.Create;
  mem.SetSize(cb);

  CopyMemory(mem.Memory, buf, cb);

  FLock.Enter;
  FListS.Add(mem);
  FLock.Leave;

  SelectEvent();

  result := 0;

  exit;

end;

function TXSocket.SoSendBytes(buf: Pbyte; len: integer): integer;
var
  cb, off: integer;
begin
  off := 0;

  while off < len do
  begin

    result := SoSend(buf + off, len - off, cb);
    if result <> 0 then
      exit;

    off := off + cb;

  end;

  result := 0;
end;

function TXSocket.SoSendStream(ucmd: integer; uerr: integer;
  buf: TXStream): integer;
var
  mem: TXStream;

begin
  mem := TXStream.Create;


  mem.WriteData(dword(buf.size));
  mem.WriteData(ucmd);
  mem.WriteData(uerr);
  mem.Write(buf.Memory^, buf.size);

  mem.Seek(0, soBeginning);

  while FListS.Count > 4 do
  begin
    if Fs = INVALID_SOCKET then
    begin
      result := WSAECONNRESET;
      exit;
    end;
    Sleep(1);
  end;

  FLock.Enter;
  FListS.Add(mem);
  FLock.Leave;

  SelectEvent();

  result := 0;

  exit;

end;

end.




[CTF入门培训]顶尖高校博士及硕士团队亲授《30小时教你玩转CTF》,视频+靶场+题目!助力进入CTF世界

最后于 2021-6-21 20:13 被PE_Hacker编辑 ,原因: 修改格式
收藏
点赞2
打赏
分享
最新回复 (1)
雪    币: 341
活跃值: (1166)
能力值: ( LV3,RANK:24 )
在线值:
发帖
回帖
粉丝
少妇之友 2021-6-21 23:08
2
1
其实select模型不能算是异步。MFC也有一个类似的,Async什么的,就是绑定窗口消息,OnRecv,OnSend。在windows下真正的异步处理应该是用IOCP实现的,Linux下是Epoll,UNIX下是kqueue
游客
登录 | 注册 方可回帖
返回