首页
社区
课程
招聘
[分享]protobuf搭配websocket实现rpc调用
发表于: 2024-3-8 13:44 3653

[分享]protobuf搭配websocket实现rpc调用

2024-3-8 13:44
3653
1
<br>

这个可以翻译到tcp服务器和客户端,我比较websocekt顺手

先安装protobuf和libhv

vcpkg install libhv[ssl]:x86-windows-static-md libhv[ssl]:x86-windows-static 

安装protubuf

vcpkg install protobuf:x86-windows-static-md  protobuf:x86-windows-static 


protobuf文件定义,做了2个RPC的请求,遍历服务器文件和获取服务器某个文件的校验值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
syntax = "proto3";
enum TYPE_protobuf_CodeType{//内容类型 枚举不能用下划线
    CodeType_getver = 0;
    CodeType_rpc = 1;
}
message _protobuf_CodeRequest{//请求内容
    TYPE_protobuf_CodeType cmd = 1;//
    repeated bytes params = 2;
    bytes methed = 3;//方法名 RPC才有效
    uint64 Rpcindex=4;//RPC编号可选
}
message _protobuf_CodeResult{//返回结果
    repeated bytes result = 1;
    uint64 Rpcindex=2;//RPC编号可选
    repeated bytes  messageFromServer = 3;//服务器返回的消息
}
message _protobuf_RPC_GetFilesByDir{
    bytes Dir=2;//枚举的目录
    bytes ExtensionName=3;//扩展名
    bool deep=4;//是否深度遍历
}

.h文件定义代码:可以不用,遇到没定义的从这里翻下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
namespace GServer{
    inline bool isHtmlFromDisk = false;
    inline int port_HuaWeiHttpServer = 9001;
    //版本
    inline string gVersion = "1.0.0";
    extern webview::webview* pWebview;
    extern void initEdge();
    extern void initHttpServer();
    inline bool isRun_WebsocketServer = false;
    extern void initWebsocketServer();
    extern void initGameWebsocketServer();
 
 
 
 
    inline std::mutex mutex_fileRpc;//文件rpc锁,防止磁盘扛不住,httpFilecache也用这个锁
    struct FilecacheData{
        std::string md5;
        std::string data;
    };
    extern std::unordered_map<std::string,FilecacheData> httpFilecache;
    extern void  clearFileCache();
 
 
    inline std::mutex mutex_edgeWs;//浏览器WS锁
    inline std::vector<WebSocketChannelPtr> clientWsClientSocket_edge;//因爲可能有多個瀏覽器連接,所以用vector
 
 
    inline std::mutex mutex_clinetWs;//客户端WS锁
    inline std::map<int,WebSocketChannelPtr> clinetWsPtrMap;//因爲可能有多個瀏覽器連接,所以用vector
 
}

cpp服务端的代码 ,一些变量没用到,我是贴的

1
2
3
4
5
6
7
8
9
10
namespace GServer{
    hv::WebSocketService _wsGameServer;
    websocket_server_t _wsGameServer_t;
    constexpr   DWORD port_GamewebSocket = 55552;
 
    static _protobuf_CodeResult  doWebSocketRpc(_protobuf_CodeRequest&,const WebSocketChannelPtr& channel);
    //Rpc分类
    static void  doGetDir(_protobuf_CodeRequest&,_protobuf_CodeResult& res);
    static void  doGetHash(_protobuf_CodeRequest&,_protobuf_CodeResult& res);
}

初始化服务端websocket代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void  GServer::initGameWebsocketServer(){
    hlog_disable();
    _wsGameServer.onopen = [] (const WebSocketChannelPtr& channel,const HttpRequestPtr& req){
        printf("onopen: GET %s id:%d\n",req->Path().c_str(),channel->id());
    };
    _wsGameServer.onclose = [] (const WebSocketChannelPtr& channel){
        printf("onclose:id:%d\n",channel->id());
 
 
        std::lock_guard<std::mutex> lok(GServer::mutex_clinetWs);
        for(auto it = clinetWsPtrMap.begin(); it != clinetWsPtrMap.end(); ++it){
            if(it->second->id() == channel->id()){
                GServer::removeClient(it->first);
                sendClientChanged();
                clinetWsPtrMap.erase(it);
                break;
            }
        }
    };
    _wsGameServer.onmessage = [] (const WebSocketChannelPtr& channel,std::string msg){
        _protobuf_CodeRequest req;
        req.ParseFromString(msg);
        safePrintfLine("onmessage:{}",req.DebugString());
        if (req.rpcindex()>0) {
            _protobuf_CodeResult res = doWebSocketRpc(req,channel);
            channel->send(res.SerializeAsString(),ws_opcode::WS_OPCODE_BINARY);
            return;
        }
         
        else{
            MySdk::ErrorOut("未定义的行为:{}",req.methed());
        }
    };
 
    _wsGameServer_t.port = port_GamewebSocket;
    _wsGameServer_t.ws = &_wsGameServer;
    websocket_server_run(&_wsGameServer_t,0);
 
}

实现rpc函数判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
_protobuf_CodeResult  GServer::doWebSocketRpc(_protobuf_CodeRequest&req,const WebSocketChannelPtr& channel){
    _protobuf_CodeResult res;
    res.set_rpcindex(req.rpcindex());
    safePrintfLine("doWebSocketRpc 方法:{}",req.methed());
    if (req.methed()=="GetDir"){
        doGetDir(req,res);
    }
    else if(req.methed() == "GetHash"){
        doGetHash(req,res);
    }
    else{
        MySdk::ErrorOut("未定义的行为Rpc methed:{}",req.methed());
    }
 
    //res.add_result().assign()
    return res;
 
}

实现具体的Rpc功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void GServer::doGetDir(_protobuf_CodeRequest& req,_protobuf_CodeResult&res){
    std::lock_guard<std::mutex> lock(mutex_fileRpc); // 使用lock_guard自动管理锁的生命周期
    if(req.params_size() != 1){
        MySdk::ErrorOut("GetDir 参数未写");
    }
    std::string dirCode = req.params(0);
    _protobuf_RPC_GetFilesByDir param;
    param.ParseFromString(dirCode);
    safePrintfLine("GetDir: 目录{} 扩展名:{}",param.dir(),param.extensionname());
    auto  arr = param.deep()==true? PathApi::getDirFilesEx(param.dir(),param.extensionname()):PathApi::getDirFiles(param.dir(),param.extensionname());
    for(auto& v : arr){
        res.add_result()->assign(v);
    }
}
 
void GServer::doGetHash(_protobuf_CodeRequest& req,_protobuf_CodeResult& res){
    std::lock_guard<std::mutex> lock(mutex_fileRpc); // 使用lock_guard自动管理锁的生命周期
    if(req.params_size() != 2){
        MySdk::ErrorOut("doGetHash 参数未写");
    }
    std::string dirCode = req.params(0);
    std::string hashType = req.params(1);
    if (!PathApi::Exist(dirCode)) {
        res.add_result()->assign("文件不存在");
        MySdk::ErrorOut("获取哈希值失败,文件不存在:{}",dirCode);
    }else{
        std::string  hashret = {};
        if(hashType == "tiger"){
            auto  str = MyFileApi::readBigFile(dirCode);
            hashret = SDK_Crppto::TigerHash(str);
        }
        else if(hashType == "md5"){
            if (GServer::httpFilecache.find(dirCode)!= GServer::httpFilecache.end()) {
                hashret = GServer::httpFilecache[dirCode].md5;
            }
            else{
                auto  str = MyFileApi::readBigFile(dirCode);
                hashret = SDK_Crppto::MD5Hash(str);
                //GServer::httpFilecache[dirCode] = { .md5=hashret,.data=str  };
 
            }
        }
        else if(hashType == "sha256"){
            auto  str = MyFileApi::readBigFile(dirCode);
            hashret = SDK_Crppto::sha256Hash(str);
        }else{
            MySdk::ErrorOut("未知的哈希类型:{}",hashType);
        }
        res.add_result()->assign(hashret);
    }
}

客户端代码:

头文件 删减下很多垃圾代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#pragma once
namespace gameSocket{
    class cmdhttp{
    public:
        static inline std::string cmd_getProtobufCmd = "/getProtobufCmd";
    };
    typedef std::function<void(std::string_view) > onOfflineCallback;//掉线回调
    extern void  setOnOfflineCallback(onOfflineCallback callback);
    extern inline void CheckGameVersion();
    /**
     * @brief 安全的post,如果失败会调用 onRestart
     * @param ph 路径,以本地路径显示,会自动处理 比如"./a.txt"
     */
    extern std::string safeDownLoad(std::string ph);
    extern hv::WebSocketClient wsClient;
    extern void initializationWebSocket();
    //接收websocket消息,对rpc的消息进行分流
    extern void  onRpcMessage(_protobuf_CodeResult& res);
 
    //调用远程过程的第二层封装,具体参数实现的函数
    /**
     * @brief 获取目录下的文件
     * @param dir 目录
     * @param extension 扩展名,如.dll,.exe,*则代表所有文件
     * @param deep 是否深度遍历,也就是是否遍历子目录
     * @return vector<std::string> 文件名
     *
     */
    extern  vector<std::string> call_GetDir(std::string_view dir,std::string_view extension,bool deep);
    /**
     * @brief 获取文件的hash值
     * @param file 文件路径
     * @param hashType hash类型,支持3个,注意小写:md5,tiger,sha256
     */
    extern  std::string call_GetHash(std::string_view file,std::string_view hashType);
    extern  void clearServerFileCache();
 
}

cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
namespace gameSocket{
 
    typedef std::shared_ptr<_protobuf_CodeRequest>  RequestPtr;
    typedef std::shared_ptr<_protobuf_CodeResult> ResponsePtr;
 
    std::mutex              g_ctxs_mutex;
    //一个类,保存上下文
    class ProtoRpcContext{
    public:
        RequestPtr  req;
        ResponsePtr   res;
    private:
        std::mutex              _mutex;
        std::condition_variable _cond;
 
    public:
 
        void notify(){
            _cond.notify_one();
        }
 
        bool wait(int timeout_ms){
            std::unique_lock<std::mutex> locker(_mutex);
            return _cond.wait_for(locker,std::chrono::milliseconds(timeout_ms)) != std::cv_status::timeout;
        }
    };
    typedef std::shared_ptr<ProtoRpcContext> ProtoRpcContextPtr;
 
 
    //当前正在远程过程调用的上下文
    static std::map<eg::uint64,ProtoRpcContextPtr> g_ctxs;
    /**
     * @brief 调用远程过程的第一层封装,超时返回空指针
     */
    static ResponsePtr callRpc(const RequestPtr& req,int timeoutMs = 10 * 1000);
}
1
gameSocket::initGameWebsocketServer();//启动

cpp中实现rpc具体功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
void  gameSocket::onRpcMessage(_protobuf_CodeResult& res){
    std::lock_guard<std::mutex> lock(g_ctxs_mutex); // 使用lock_guard自动管理锁的生命周期
    auto it = g_ctxs.find(res.rpcindex());
    if(it != g_ctxs.end()){
        it->second->res = std::make_shared<_protobuf_CodeResult>(res);
        it->second->notify();
        return;
    }
}
 
gameSocket::ResponsePtr gameSocket::callRpc(const RequestPtr& req,int timeoutMs){
    static std::atomic<uint64_t> s_id = ATOMIC_VAR_INIT(1);
    req->set_rpcindex(++s_id);
    auto ctx = std::make_shared<gameSocket::ProtoRpcContext>();
    ctx->req = req;
    {
        std::lock_guard<std::mutex> lock(g_ctxs_mutex); 
        g_ctxs[req->rpcindex()] = ctx;
    }
 
    std::string messageReq = req->SerializeAsString();
    safePrintfLine("callRpc debugString:{}",req->DebugString());
    gameSocket::wsClient.send(messageReq);
    if (!ctx->wait(timeoutMs)) {
        //AfxMessageBox("大概超时了");
    }
    //可能超时
    ResponsePtr res = ctx->res;
 
    std::lock_guard<std::mutex> lock(g_ctxs_mutex); 
    eg::uint64 rpcindex = req->rpcindex();
    gameSocket::g_ctxs.erase(rpcindex);
 
    return res;
}
 
vector<std::string> gameSocket::call_GetDir(std::string_view dir,std::string_view extension,bool deep){
    try{
        _protobuf_CodeRequest req;
        req.set_cmd(TYPE_protobuf_CodeType::CodeType_rpc);
        req.set_methed("GetDir");
 
        _protobuf_RPC_GetFilesByDir params;
        params.set_dir(dir.data());
        params.set_extensionname(extension.data());
        params.set_deep(deep);
        //参数1,直接传入_protobuf_RPC_GetFilesByDir的序列化文本得了
 
        req.add_params()->assign(params.SerializeAsString());
        RequestPtr reqPtr = std::make_shared<_protobuf_CodeRequest>(req);
        auto s = callRpc(reqPtr,10*1000);
        if(!s){
            return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
        }
        std::vector<std::string> result;
        for(int i = 0; i < s->result_size(); i++){
            result.emplace_back(s->result(i));
        }
        return result;
    catch(std::exception& e){
        MySdk::ErrorOutExit("call_GetDir Error:{}",e.what());
    }
    return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
}
std::string gameSocket::call_GetHash(std::string_view file,std::string_view hashType){
    try{
        if (hashType!="md5"&&hashType!="tiger"&&hashType!="sha256"){
            MySdk::ErrorOutExit("call_GetHash 获取服务器哈希值 方法错误:{}",hashType);
        }
        _protobuf_CodeRequest req;
        req.set_cmd(TYPE_protobuf_CodeType::CodeType_rpc);
        req.set_methed("GetHash");
 
        //参数1,直接传入_protobuf_RPC_GetFilesByDir的序列化文本得了
 
        req.add_params()->assign(file);
        req.add_params()->assign(hashType);
        RequestPtr reqPtr = std::make_shared<_protobuf_CodeRequest>(req);
        auto s = callRpc(reqPtr,10 * 1000);
        if(!s){
            return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
        }
         
        if (s->result_size()==0) {
            return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
        }
 
        return s->result(0);
    catch(std::exception& e){
        MySdk::ErrorOutExit("GetHash Error:{}",e.what());
    }
    return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
}

启动客户端websocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void gameSocket::initializationWebSocket(){
    std::call_once(gameSocket::flagWebSOcket,[] (){
        try{
            auto [ip,_] = GameConfigs::getHttpServerIp();
            std::string  s_websocket_url = "ws://" + ip + ":" + std::to_string(GameConfigs::port_webSocket);
            gameSocket::wsClient.onopen = [&] (){
                isWsLinked = true;
                printf("onopen\n");
            };
            gameSocket::wsClient.onclose = [] (){
                printf("onclose\n");//设置断线回调,可有可无
                if(isWsLinked&&onOfflineCallbackFunc){
                    (*onOfflineCallbackFunc)("onclose");
                }
            };
            gameSocket::wsClient.onmessage = [] (const std::string& msg){
                //printf("客户端 onmessage: %s\n",msg.c_str());
                Try_Begin();
                _protobuf_CodeResult res;
                res.ParseFromString(msg);
                if(res.rpcindex() > 0){
                    gameSocket::onRpcMessage(res);
                    return;
                }
                else if(res.messagefromserver().size()<= 0){
                    MySdk::ErrorOut("服务器消息尺寸为0 消息是:{}",msg);
                }
                //其他非rpc的回应
                Try_End();
            };
            gameSocket::wsClient.onConnection = [] (const hv::SocketChannelPtr&){
                printf("onConnection\n");
            };
                        //设置断线自动重连和心跳包
            reconn_setting_t reconn;
            reconn.min_delay = 1000;
            reconn.max_delay = 2000;
            reconn.delay_policy = 2;
            gameSocket::wsClient.setReconnect(&reconn);
            gameSocket::wsClient.setPingInterval(1000 * 10);
            gameSocket::wsClient.open(s_websocket_url.c_str());
            gameSocket::wsClient.setConnectTimeout(10*1000);
            while(true){
                if(isWsLinked){
                    break;
                }
                else{
                    printf("!正在连接服务器.... \n");
                }
                Sleep(1000);
            }
 
            std::string ret = call_addClient();
            if (ret!="ok"){
                MySdk::ErrorOutExit("连接服务器未知错误:{}",ret);
            }
            addClient("号刚启动,没上线");
 
        catch(std::exception& e){
            MySdk::ErrorOutExit("initializationWebSocket Error:{}",e.what());
        }
    });
    setClientLog("ws服务器初始化完成");
}
1
gameSocket::initializationWebSocket();启动



附上编译protobuf的py脚本,调用cmd还是麻烦

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import subprocess,shutil
commad = "protoc --cpp_out=.  ./HttpCode.proto"
#commadJs = "protoc --js_out=.  ./HttpCode.proto"
result = subprocess.call(commad, shell=True)
print("生成完成")
 
source_files = ["HttpCode.pb.cc""HttpCode.pb.h"]
# 拷贝到目标目录
target_directoryClient = "Q:\\项目源码\\MyFrameWork\\xxxClient\\"  # 客户端目录
target_directoryServer = "Q:\\项目源码\\MyFrameWork\\xxxServer\\"  # 服务端目录
 
print("开始拷贝")
# 遍历要拷贝的文件执行操作
for file in source_files:
    shutil.copy(file, target_directoryClient)
    shutil.copy(file, target_directoryServer)
print("protobuf文件生成完成,已拷贝到目标目录")
#result2 = subprocess.call(commadJs, shell=True)

其他有用到的函数参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#define FPrintf fmt::print
#define FFormat fmt::format
template <typename... ARG>
inline std::string safeFmt(const char* fmt,ARG... args){
    try{
        string re = FFormat(fmt,args...);
        return re;
    catch(fmt::v9::format_error& e){
        std::string errorCode = FFormat("safeFmt format_error:{} Fmt Char*= \"{}\"",e.what(),fmt);
        ::MessageBox(NULL,errorCode.c_str(),"提示:",MB_OK | MB_ICONERROR | MB_SYSTEMMODAL);
    }
    return "null";
}
template <typename... ARG>
void ErrorOut(const char* fmt,ARG... args){
    string s = safeFmt(fmt,args...);
    ::MessageBox(NULL,s.c_str(),"提示",
                 MB_OK | MB_ICONERROR | MB_SYSTEMMODAL);
}
template <typename... ARG>
void ErrorOutExit(const char* fmt,ARG... args){
    ErrorOut(fmt,args...);
    ::ExitProcess(0); // exit会调用析构函数  ExitProcess是立即结束
}
#define Constu8(method) (const char*)u8##method
#define  Try_Begin() try{
#define  Try_End() } catch(const std::exception& e){SPDLOG_DEBUG("Error:{}",e.what());MySdk::ErrorOutExit("Error:\n\t函数:<{}><{}>\n\t文件:{}\n\t行:{}\n\t异常信息{}\n\t详情可看调试输出",__FUNCTION__,__func__,__FILE__,__LINE__, e.what());}



[注意]看雪招聘,专注安全领域的专业人才平台!

收藏
免费
支持
分享
最新回复 (2)
雪    币: 10
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
2
感谢分享
2024-3-8 14:02
0
雪    币: 4723
活跃值: (31636)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
3
感谢分享
2024-3-10 16:11
1
游客
登录 | 注册 方可回帖
返回

账号登录
验证码登录

忘记密码?
没有账号?立即免费注册