首页
社区
课程
招聘
[分享]protobuf搭配websocket实现rpc调用
发表于: 2024-3-8 13:44 3318

[分享]protobuf搭配websocket实现rpc调用

2024-3-8 13:44
3318

这个可以翻译到tcp服务器和客户端,我比较websocekt顺手

先安装protobuf和libhv

vcpkg install libhv[ssl]:x86-windows-static-md libhv[ssl]:x86-windows-static 

安装protubuf

vcpkg install protobuf:x86-windows-static-md  protobuf:x86-windows-static 


protobuf文件定义,做了2个RPC的请求,遍历服务器文件和获取服务器某个文件的校验值

syntax = "proto3";
enum TYPE_protobuf_CodeType{//内容类型 枚举不能用下划线
    CodeType_getver = 0;
    CodeType_rpc = 1;
}
message _protobuf_CodeRequest{//请求内容
    TYPE_protobuf_CodeType cmd = 1;//
    repeated bytes params = 2;
    bytes methed = 3;//方法名 RPC才有效
    uint64 Rpcindex=4;//RPC编号可选
}
message _protobuf_CodeResult{//返回结果
    repeated bytes result = 1;
    uint64 Rpcindex=2;//RPC编号可选
    repeated bytes  messageFromServer = 3;//服务器返回的消息
}
message _protobuf_RPC_GetFilesByDir{
    bytes Dir=2;//枚举的目录
    bytes ExtensionName=3;//扩展名
    bool deep=4;//是否深度遍历
}

.h文件定义代码:可以不用,遇到没定义的从这里翻下

namespace GServer{
	inline bool isHtmlFromDisk = false;
	inline int port_HuaWeiHttpServer = 9001;
	//版本
	inline string gVersion = "1.0.0";
	extern webview::webview* pWebview;
	extern void initEdge();
	extern void initHttpServer();
	inline bool isRun_WebsocketServer = false;
	extern void initWebsocketServer();
	extern void initGameWebsocketServer();




	inline std::mutex mutex_fileRpc;//文件rpc锁,防止磁盘扛不住,httpFilecache也用这个锁
	struct FilecacheData{
		std::string md5;
		std::string data;
	};
	extern std::unordered_map<std::string,FilecacheData> httpFilecache;
	extern void  clearFileCache();


	inline std::mutex mutex_edgeWs;//浏览器WS锁
	inline std::vector<WebSocketChannelPtr> clientWsClientSocket_edge;//因爲可能有多個瀏覽器連接,所以用vector


	inline std::mutex mutex_clinetWs;//客户端WS锁
	inline std::map<int,WebSocketChannelPtr> clinetWsPtrMap;//因爲可能有多個瀏覽器連接,所以用vector

}

cpp服务端的代码 ,一些变量没用到,我是贴的

namespace GServer{
	hv::WebSocketService _wsGameServer;
	websocket_server_t _wsGameServer_t;
	constexpr	DWORD port_GamewebSocket = 55552;

	static _protobuf_CodeResult  doWebSocketRpc(_protobuf_CodeRequest&,const WebSocketChannelPtr& channel);
	//Rpc分类
	static void  doGetDir(_protobuf_CodeRequest&,_protobuf_CodeResult& res);
	static void  doGetHash(_protobuf_CodeRequest&,_protobuf_CodeResult& res);
}

初始化服务端websocket代码

void  GServer::initGameWebsocketServer(){
	hlog_disable();
	_wsGameServer.onopen = [] (const WebSocketChannelPtr& channel,const HttpRequestPtr& req){
		printf("onopen: GET %s id:%d\n",req->Path().c_str(),channel->id());
	};
	_wsGameServer.onclose = [] (const WebSocketChannelPtr& channel){
		printf("onclose:id:%d\n",channel->id());


		std::lock_guard<std::mutex> lok(GServer::mutex_clinetWs);
		for(auto it = clinetWsPtrMap.begin(); it != clinetWsPtrMap.end(); ++it){
			if(it->second->id() == channel->id()){
				GServer::removeClient(it->first);
				sendClientChanged();
				clinetWsPtrMap.erase(it);
				break;
			}
		}
	};
	_wsGameServer.onmessage = [] (const WebSocketChannelPtr& channel,std::string msg){
		_protobuf_CodeRequest req;
		req.ParseFromString(msg);
		safePrintfLine("onmessage:{}",req.DebugString());
		if (req.rpcindex()>0) {
			_protobuf_CodeResult res = doWebSocketRpc(req,channel);
			channel->send(res.SerializeAsString(),ws_opcode::WS_OPCODE_BINARY);
			return;
		}
		
		else{
			MySdk::ErrorOut("未定义的行为:{}",req.methed());
		}
	};

	_wsGameServer_t.port = port_GamewebSocket;
	_wsGameServer_t.ws = &_wsGameServer;
	websocket_server_run(&_wsGameServer_t,0);

}

实现rpc函数判断

_protobuf_CodeResult  GServer::doWebSocketRpc(_protobuf_CodeRequest&req,const WebSocketChannelPtr& channel){
	_protobuf_CodeResult res;
	res.set_rpcindex(req.rpcindex());
	safePrintfLine("doWebSocketRpc 方法:{}",req.methed());
	if (req.methed()=="GetDir"){
		doGetDir(req,res);
	}
	else if(req.methed() == "GetHash"){
		doGetHash(req,res);
	}
	else{
		MySdk::ErrorOut("未定义的行为Rpc methed:{}",req.methed());
	}

	//res.add_result().assign()
	return res;

}

实现具体的Rpc功能

void GServer::doGetDir(_protobuf_CodeRequest& req,_protobuf_CodeResult&res){
	std::lock_guard<std::mutex> lock(mutex_fileRpc); // 使用lock_guard自动管理锁的生命周期
	if(req.params_size() != 1){
		MySdk::ErrorOut("GetDir 参数未写");
	}
	std::string dirCode = req.params(0);
	_protobuf_RPC_GetFilesByDir param;
	param.ParseFromString(dirCode);
	safePrintfLine("GetDir: 目录{} 扩展名:{}",param.dir(),param.extensionname());
	auto  arr = param.deep()==true? PathApi::getDirFilesEx(param.dir(),param.extensionname()):PathApi::getDirFiles(param.dir(),param.extensionname());
	for(auto& v : arr){
		res.add_result()->assign(v);
	}
}

void GServer::doGetHash(_protobuf_CodeRequest& req,_protobuf_CodeResult& res){
	std::lock_guard<std::mutex> lock(mutex_fileRpc); // 使用lock_guard自动管理锁的生命周期
	if(req.params_size() != 2){
		MySdk::ErrorOut("doGetHash 参数未写");
	}
	std::string dirCode = req.params(0);
	std::string hashType = req.params(1);
	if (!PathApi::Exist(dirCode)) {
		res.add_result()->assign("文件不存在");
		MySdk::ErrorOut("获取哈希值失败,文件不存在:{}",dirCode);
	}else{
		std::string  hashret = {};
		if(hashType == "tiger"){
			auto  str = MyFileApi::readBigFile(dirCode);
			hashret = SDK_Crppto::TigerHash(str);
		}
		else if(hashType == "md5"){
			if (GServer::httpFilecache.find(dirCode)!= GServer::httpFilecache.end()) {
				hashret = GServer::httpFilecache[dirCode].md5;
			}
			else{
				auto  str = MyFileApi::readBigFile(dirCode);
				hashret = SDK_Crppto::MD5Hash(str);
				//GServer::httpFilecache[dirCode] = { .md5=hashret,.data=str  };

			}
		}
		else if(hashType == "sha256"){
			auto  str = MyFileApi::readBigFile(dirCode);
			hashret = SDK_Crppto::sha256Hash(str);
		}else{
			MySdk::ErrorOut("未知的哈希类型:{}",hashType);
		}
		res.add_result()->assign(hashret);
	}
}

客户端代码:

头文件 删减下很多垃圾代码:

#pragma once
namespace gameSocket{
	class cmdhttp{
	public:
		static inline std::string cmd_getProtobufCmd = "/getProtobufCmd";
	};
	typedef std::function<void(std::string_view) > onOfflineCallback;//掉线回调
	extern void  setOnOfflineCallback(onOfflineCallback callback);
	extern inline void CheckGameVersion();
	/**
	 * @brief 安全的post,如果失败会调用 onRestart
	 * @param ph 路径,以本地路径显示,会自动处理 比如"./a.txt"
	 */
	extern std::string safeDownLoad(std::string ph);
	extern hv::WebSocketClient wsClient;
	extern void initializationWebSocket();
	//接收websocket消息,对rpc的消息进行分流
	extern void  onRpcMessage(_protobuf_CodeResult& res);

	//调用远程过程的第二层封装,具体参数实现的函数
	/**
	 * @brief 获取目录下的文件
	 * @param dir 目录
	 * @param extension 扩展名,如.dll,.exe,*则代表所有文件
	 * @param deep 是否深度遍历,也就是是否遍历子目录
	 * @return vector<std::string> 文件名
	 *
	 */
	extern  vector<std::string> call_GetDir(std::string_view dir,std::string_view extension,bool deep);
	/**
	 * @brief 获取文件的hash值
	 * @param file 文件路径
	 * @param hashType hash类型,支持3个,注意小写:md5,tiger,sha256
	 */
	extern  std::string call_GetHash(std::string_view file,std::string_view hashType);
	extern  void clearServerFileCache();

}

cpp

namespace gameSocket{

	typedef std::shared_ptr<_protobuf_CodeRequest>  RequestPtr;
	typedef std::shared_ptr<_protobuf_CodeResult> ResponsePtr;

	std::mutex              g_ctxs_mutex;
	//一个类,保存上下文
	class ProtoRpcContext{
	public:
		RequestPtr  req;
		ResponsePtr   res;
	private:
		std::mutex              _mutex;
		std::condition_variable _cond;

	public:

		void notify(){
			_cond.notify_one();
		}

		bool wait(int timeout_ms){
			std::unique_lock<std::mutex> locker(_mutex);
			return _cond.wait_for(locker,std::chrono::milliseconds(timeout_ms)) != std::cv_status::timeout;
		}
	};
	typedef std::shared_ptr<ProtoRpcContext> ProtoRpcContextPtr;


	//当前正在远程过程调用的上下文
	static std::map<eg::uint64,ProtoRpcContextPtr> g_ctxs;
	/**
	 * @brief 调用远程过程的第一层封装,超时返回空指针
	 */
	static ResponsePtr callRpc(const RequestPtr& req,int timeoutMs = 10 * 1000);
}
gameSocket::initGameWebsocketServer();//启动

cpp中实现rpc具体功能:

void  gameSocket::onRpcMessage(_protobuf_CodeResult& res){
	std::lock_guard<std::mutex> lock(g_ctxs_mutex); // 使用lock_guard自动管理锁的生命周期
	auto it = g_ctxs.find(res.rpcindex());
	if(it != g_ctxs.end()){
		it->second->res = std::make_shared<_protobuf_CodeResult>(res);
		it->second->notify();
		return;
	}
}

gameSocket::ResponsePtr gameSocket::callRpc(const RequestPtr& req,int timeoutMs){
	static std::atomic<uint64_t> s_id = ATOMIC_VAR_INIT(1);
	req->set_rpcindex(++s_id);
	auto ctx = std::make_shared<gameSocket::ProtoRpcContext>();
	ctx->req = req;
	{
		std::lock_guard<std::mutex> lock(g_ctxs_mutex); 
		g_ctxs[req->rpcindex()] = ctx;
	}

	std::string messageReq = req->SerializeAsString();
	safePrintfLine("callRpc debugString:{}",req->DebugString());
	gameSocket::wsClient.send(messageReq);
	if (!ctx->wait(timeoutMs)) {
		//AfxMessageBox("大概超时了");
	}
	//可能超时
	ResponsePtr res = ctx->res;

	std::lock_guard<std::mutex> lock(g_ctxs_mutex); 
	eg::uint64 rpcindex = req->rpcindex();
	gameSocket::g_ctxs.erase(rpcindex);

	return res;
}

vector<std::string> gameSocket::call_GetDir(std::string_view dir,std::string_view extension,bool deep){
	try{
		_protobuf_CodeRequest req;
		req.set_cmd(TYPE_protobuf_CodeType::CodeType_rpc);
		req.set_methed("GetDir");

		_protobuf_RPC_GetFilesByDir params;
		params.set_dir(dir.data());
		params.set_extensionname(extension.data());
		params.set_deep(deep);
		//参数1,直接传入_protobuf_RPC_GetFilesByDir的序列化文本得了

		req.add_params()->assign(params.SerializeAsString());
		RequestPtr reqPtr = std::make_shared<_protobuf_CodeRequest>(req);
		auto s = callRpc(reqPtr,10*1000);
		if(!s){
			return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
		}
		std::vector<std::string> result;
		for(int i = 0; i < s->result_size(); i++){
			result.emplace_back(s->result(i));
		}
		return result;
	} catch(std::exception& e){
		MySdk::ErrorOutExit("call_GetDir Error:{}",e.what());
	}
	return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
}
std::string gameSocket::call_GetHash(std::string_view file,std::string_view hashType){
	try{
		if (hashType!="md5"&&hashType!="tiger"&&hashType!="sha256"){
			MySdk::ErrorOutExit("call_GetHash 获取服务器哈希值 方法错误:{}",hashType);
		}
		_protobuf_CodeRequest req;
		req.set_cmd(TYPE_protobuf_CodeType::CodeType_rpc);
		req.set_methed("GetHash");

		//参数1,直接传入_protobuf_RPC_GetFilesByDir的序列化文本得了

		req.add_params()->assign(file);
		req.add_params()->assign(hashType);
		RequestPtr reqPtr = std::make_shared<_protobuf_CodeRequest>(req);
		auto s = callRpc(reqPtr,10 * 1000);
		if(!s){
			return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
		}
		
		if (s->result_size()==0) {
			return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
		}

		return s->result(0);
	} catch(std::exception& e){
		MySdk::ErrorOutExit("GetHash Error:{}",e.what());
	}
	return {};//超时也要返回空数组 不返回的话编译器不报错但是会崩溃
}

启动客户端websocket

void gameSocket::initializationWebSocket(){
	std::call_once(gameSocket::flagWebSOcket,[] (){
		try{
			auto [ip,_] = GameConfigs::getHttpServerIp();
			std::string  s_websocket_url = "ws://" + ip + ":" + std::to_string(GameConfigs::port_webSocket);
			gameSocket::wsClient.onopen = [&] (){
				isWsLinked = true;
				printf("onopen\n");
			};
			gameSocket::wsClient.onclose = [] (){
				printf("onclose\n");//设置断线回调,可有可无
				if(isWsLinked&&onOfflineCallbackFunc){
					(*onOfflineCallbackFunc)("onclose");
				}
			};
			gameSocket::wsClient.onmessage = [] (const std::string& msg){
				//printf("客户端 onmessage: %s\n",msg.c_str());
				Try_Begin();
				_protobuf_CodeResult res;
				res.ParseFromString(msg);
				if(res.rpcindex() > 0){
					gameSocket::onRpcMessage(res);
					return;
				}
				else if(res.messagefromserver().size()<= 0){
					MySdk::ErrorOut("服务器消息尺寸为0 消息是:{}",msg);
				}
				//其他非rpc的回应
				Try_End();
			};
			gameSocket::wsClient.onConnection = [] (const hv::SocketChannelPtr&){
				printf("onConnection\n");
			};
                        //设置断线自动重连和心跳包
			reconn_setting_t reconn;
			reconn.min_delay = 1000;
			reconn.max_delay = 2000;
			reconn.delay_policy = 2;
			gameSocket::wsClient.setReconnect(&reconn);
			gameSocket::wsClient.setPingInterval(1000 * 10);
			gameSocket::wsClient.open(s_websocket_url.c_str());
			gameSocket::wsClient.setConnectTimeout(10*1000);
			while(true){
				if(isWsLinked){
					break;
				}
				else{
					printf("!正在连接服务器.... \n");
				}
				Sleep(1000);
			}

			std::string ret = call_addClient();
			if (ret!="ok"){
				MySdk::ErrorOutExit("连接服务器未知错误:{}",ret);
			}
			addClient("号刚启动,没上线");

		} catch(std::exception& e){
			MySdk::ErrorOutExit("initializationWebSocket Error:{}",e.what());
		}
	});
	setClientLog("ws服务器初始化完成");
}
gameSocket::initializationWebSocket();启动



附上编译protobuf的py脚本,调用cmd还是麻烦

import subprocess,shutil
commad = "protoc --cpp_out=.  ./HttpCode.proto"
#commadJs = "protoc --js_out=.  ./HttpCode.proto"
result = subprocess.call(commad, shell=True)
print("生成完成")

source_files = ["HttpCode.pb.cc", "HttpCode.pb.h"]
# 拷贝到目标目录
target_directoryClient = "Q:\\项目源码\\MyFrameWork\\xxxClient\\"  # 客户端目录
target_directoryServer = "Q:\\项目源码\\MyFrameWork\\xxxServer\\"  # 服务端目录

print("开始拷贝")
# 遍历要拷贝的文件执行操作
for file in source_files:
    shutil.copy(file, target_directoryClient)
    shutil.copy(file, target_directoryServer)
print("protobuf文件生成完成,已拷贝到目标目录")
#result2 = subprocess.call(commadJs, shell=True)

其他有用到的函数参考

#define FPrintf fmt::print
#define FFormat fmt::format
template <typename... ARG>
inline std::string safeFmt(const char* fmt,ARG... args){
	try{
		string re = FFormat(fmt,args...);
		return re;
	} catch(fmt::v9::format_error& e){
		std::string errorCode = FFormat("safeFmt format_error:{} Fmt Char*= \"{}\"",e.what(),fmt);
		::MessageBox(NULL,errorCode.c_str(),"提示:",MB_OK | MB_ICONERROR | MB_SYSTEMMODAL);
	}
	return "null";
}
template <typename... ARG>
void ErrorOut(const char* fmt,ARG... args){
	string s = safeFmt(fmt,args...);
	::MessageBox(NULL,s.c_str(),"提示",
				 MB_OK | MB_ICONERROR | MB_SYSTEMMODAL);
}
template <typename... ARG>
void ErrorOutExit(const char* fmt,ARG... args){
	ErrorOut(fmt,args...);
	::ExitProcess(0); // exit会调用析构函数  ExitProcess是立即结束
}
#define Constu8(method) (const char*)u8##method
#define  Try_Begin() try{
#define  Try_End() } catch(const std::exception& e){SPDLOG_DEBUG("Error:{}",e.what());MySdk::ErrorOutExit("Error:\n\t函数:<{}><{}>\n\t文件:{}\n\t行:{}\n\t异常信息{}\n\t详情可看调试输出",__FUNCTION__,__func__,__FILE__,__LINE__, e.what());}



[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课

收藏
免费 0
支持
分享
最新回复 (2)
雪    币: 10
能力值: ( LV1,RANK:0 )
在线值:
发帖
回帖
粉丝
2
感谢分享
2024-3-8 14:02
0
雪    币: 3573
活跃值: (31026)
能力值: ( LV2,RANK:10 )
在线值:
发帖
回帖
粉丝
3
感谢分享
2024-3-10 16:11
1
游客
登录 | 注册 方可回帖
返回
//