当前位置: 首页> 房产> 建材 > 怎样下载软件到电脑桌面上_电商平台如何引流推广_ip切换工具_信息推广

怎样下载软件到电脑桌面上_电商平台如何引流推广_ip切换工具_信息推广

时间:2025/7/11 15:37:15来源:https://blog.csdn.net/2301_77954967/article/details/146997545 浏览次数:0次
怎样下载软件到电脑桌面上_电商平台如何引流推广_ip切换工具_信息推广

📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨

在这里插入图片描述

在这里插入图片描述

文章目录

  • 📢前言
  • 🏳️‍🌈一、rpc_registry 注册模块
    • 1.1 rpc_registry 框架逻辑分析
      • 1.1.1 Provider类
      • 1.1.2 MethodHost类
      • 1.1.3 Discoverer类
    • 1.2 Provider类
    • 1.3 MethodHost 类
    • 1.4 Discoverer 类
  • 🏳️‍🌈二、rpc_topic 主题模块
    • 2.1 逻辑框架
    • 2.2 核心功能
    • 2.3 接口方法详解
      • 2.3.1 主题管理
      • 2.3.2 订阅管理
      • 2.3.3 消息发布
      • 2.3.4 消息处理
    • 2.4 核心逻辑:commonRequest 方法
      • 2.4.1 ​构造请求:
    • 2.4.2 发送请求
      • 2.4.3 处理响应:
      • 2.4.4 实现代码
    • 2.5 整体代码
  • 🏳️‍🌈三、rpc_client 客户端整合
    • 3.1 整体架构
    • 3.2 核心类详解
      • 3.2.1 RegistryClient(服务注册客户端)​
      • 3.2.2DiscoveryClient(服务发现客户端)​
      • 3.2.3RpcClient(RPC调用客户端)​
      • 3.2.4TopicClient(主题客户端)​
    • 3.3 协作逻辑与设计思路
      • 3.3.1 消息分发(Dispatcher)​
      • 3.3.2 请求处理(Requestor)​
      • 3.3.3 服务发现与连接池(RpcClient)​
      • 3.3.4 主题管理(TopicManager)​
  • 🏳️‍🌈四、主题发布订阅流程示意
    • 4.1 核心文件与类说明
    • 4.2 步骤 1:客户端A创建主题
    • 4.3 步骤 2:客户端B订阅主题
    • 4.4 步骤 3:客户端A发布消息
    • 4.5 步骤 4:服务端广播消息
    • 4.6 步骤 5:客户端B接收消息
    • 4.7 关键类交互图
    • 4.8 补充说明
  • 🏳️‍🌈五、运程调用add方法流程示意
    • 5.1 核心文件与类说明
    • 5.2 步骤 1:服务端启动并注册 add 方法
    • 5.3 步骤 2:客户端构造调用请求
    • 5.4 步骤3:服务端接收请求并路由到 add 方法
    • 5.5 步骤4:网络层数据传输
    • 5.6 关键类交互图
    • 5.7 补充说明
  • 👥总结


📢前言

到目前为止,我们已经将整个 rpc项目 的基本完成了,就只剩下客户端的部分功能了

这篇文章将带大家实现一些客户端的最后三个模块

  1. 客户端注册模块
  2. 客户端主题模块
  3. 客户端整合

本系列博客的代码都在上述库中的下图(demo代表笔者模仿的案例)
在这里插入图片描述


🏳️‍🌈一、rpc_registry 注册模块

1.1 rpc_registry 框架逻辑分析

该代码实现了一个 RPC 客户端核心模块,包含 ​服务注册服务发现与动态维护负载均衡 三大核心功能,通过以下三个类协作完成

graph TDA[Provider] -->|注册服务| B(注册中心)C[Discoverer] -->|发现服务| BC -->|维护本地缓存| D[MethodHost]

1.1.1 Provider类

  • 这个类的 核心功能向服务端注册服务方法,并处理注册响应
  • 构造函数接受一个 Requestor 的智能指针,用于发送请求。
  • registryMethod 方法负责 发送服务注册请求构造ServiceRequest消息并通过Requestor发送,然后处理响应
  • 如果注册失败,会记录错误日志。
// 核心功能:向服务端注册服务方法,并处理注册响应。
class Provider {
public:using ptr = std::shared_ptr<Provider>;Provider(const Requestor::ptr& requestor);// 注册服务方法bool registryMethod(const BaseConnection::ptr& conn,const std::string& method, const Address& host);private:Requestor::ptr _requestor;
};

1.1.2 MethodHost类

  • 管理某个服务方法的主机地址列表支持动态增删和轮询选择
  • 成员变量包括互斥锁轮询索引主机地址列表
  • 提供添加主机、移除主机、选择主机的方法,这些操作都是线程安全的。
  • chooseHost 方法使用简单的轮询算法实现负载均衡。
// 核心功能:管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
// 设计目标:提供线程安全的主机列表管理,支持动态更新。
class MethodHost {
public:using ptr = std::shared_ptr<MethodHost>;MethodHost();MethodHost(const std::vector<Address>& hosts);// 添加主机地址void appendHost(const Address& host);// 移除主机地址void removerHost(const Address& host);// 轮询选择主机(简单负载均衡)Address chooseHost();bool empty();private:std::mutex _mutex;size_t _idx;                 // 轮询选择下标std::vector<Address> _hosts; // 主机地址列表
};

1.1.3 Discoverer类

  • 发现服务提供者,处理上下线通知,维护本地服务缓存。
  • 包含一个离线回调函数,当服务下线时触发。
  • serviceDiscovery 方法处理服务发现请求,先检查本地缓存,若未命中则向服务端请求,并更新缓存。
  • onServiceRequest 处理服务端推送的上下线通知,更新本地的主机列表。
  • 使用 MethodHost 类来管理每个方法的主机列表,确保线程安全。
// 核心功能:发现服务提供者,处理上下线通知,维护本地服务缓存。
class Discoverer {
public:using ptr = std::shared_ptr<Discoverer>;using OfflineCallback = std::function<void(const Address&)>;Discoverer(const Requestor::ptr& requestor, const OfflineCallback& cb);// 主动发起服务发现请求。// 1. 本地缓存命中:直接返回已缓存的主机地址。// 2. ​缓存未命中:发送 SERVICE_DISCOVERY// 请求,获取服务地址并更新缓存。bool serviceDiscovery(const BaseConnection::ptr& conn,const std::string& method, Address& host);// 提供给 Dispatcher 模块进行服务上线下线请求处理的回调函数// 处理服务端推送的上下线通知。// 设计目标:动态更新本地服务列表,确保客户端始终获取最新服务地址void onServiceRequest(const BaseConnection::ptr& conn,const ServiceRequest::ptr& msg);private:OfflineCallback _offline_callback;std::mutex _mutex;std::unordered_map<std::string, MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;
};

1.2 Provider类

功能:向注册中心注册服务方法。
​关键成员

  • _requestor:发送请求和接收响应的工具类。

核心方法

  • registryMethod(conn, method, host)
    发送 SERVICE_REGISTRY 请求,将服务方法 method 和主机 host 注册到注册中心。
class Provider {
public:using ptr = std::shared_ptr<Provider>;Provider(const Requestor::ptr& requestor) : _requestor(requestor) {}// 注册服务方法bool registryMethod(const BaseConnection::ptr& conn,const std::string& method, const Address& host) {// 1. 创建 SERVICE_REGISTRY 类型的请求消息。// 2. 通过 Requestor 发送请求并等待响应。// 3. 验证响应类型和状态码,返回注册结果。auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setServiceOptype(ServiceOptype::SERVICE_REGISTRY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("%s 服务注册失败!", method.c_str());return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (service_rsp.get() == nullptr) {ELOG("响应类型向下转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务注册失败,原因: %s",errReason(service_rsp->rcode()).c_str());return false;}return true;}private:Requestor::ptr _requestor;
};

1.3 MethodHost 类

功能:管理某个服务方法的主机列表,支持动态增删和轮询选择。
​关键成员

  • _hosts:服务提供者地址列表。
  • _idx:轮询索引,实现简单负载均衡。

核心方法

  • appendHost(host):线程安全地添加主机地址。
  • removeHost(host):线程安全地移除主机地址。
  • chooseHost():轮询选择下一个主机地址。
// 核心功能:管理某个服务方法的主机地址列表,支持动态增删和轮询选择。
// 设计目标:提供线程安全的主机列表管理,支持动态更新。
class MethodHost {
public:using ptr = std::shared_ptr<MethodHost>;MethodHost() : _idx(0) {}MethodHost(const std::vector<Address>& hosts): _hosts(hosts.begin(), hosts.end()), _idx(0) {}// 添加主机地址void appendHost(const Address& host) {// 中途收到了服务上线请求后被调用std::unique_lock<std::mutex> lock(_mutex);_hosts.emplace_back(host);}// 移除主机地址void removerHost(const Address& host) {// 中途收到了服务下线请求后被调用std::unique_lock<std::mutex> lock(_mutex);for (auto it = _hosts.begin(); it != _hosts.end(); ++it) {if (*it == host) {_hosts.erase(it);break;}}}// 轮询选择主机(简单负载均衡)Address chooseHost() {std::unique_lock<std::mutex> lock(_mutex);size_t pos = _idx++ % _hosts.size();return _hosts[pos];}bool empty() { return _hosts.empty(); }private:std::mutex _mutex;size_t _idx;                 // 轮询选择下标std::vector<Address> _hosts; // 主机地址列表
};

1.4 Discoverer 类

功能:动态发现服务地址,处理上下线通知,维护本地缓存。
​关键成员

  • _method_hosts:服务名 → 主机列表的映射(使用 MethodHost 管理)。
  • _offline_callback:服务下线时的回调函数。

​核心方法

  • serviceDiscovery(conn, method, host)
    若本地缓存有效,直接返回地址;否则向注册中心发送 SERVICE_DISCOVERY 请求。
  • onServiceRequest(conn, msg)
    处理服务端推送的上下线通知,更新本地缓存。
// 核心功能:发现服务提供者,处理上下线通知,维护本地服务缓存。
class Discoverer {
public:using ptr = std::shared_ptr<Discoverer>;using OfflineCallback = std::function<void(const Address&)>;Discoverer(const Requestor::ptr& requestor, const OfflineCallback& cb): _requestor(requestor), _offline_callback(cb) {}// 主动发起服务发现请求。// 1. 本地缓存命中:直接返回已缓存的主机地址。// 2. ​缓存未命中:发送 SERVICE_DISCOVERY// 请求,获取服务地址并更新缓存。bool serviceDiscovery(const BaseConnection::ptr& conn,const std::string& method, Address& host) {{// 当前所报关的提供者信息存在,则直接返回地址std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find(method);if (it != _method_hosts.end()) {if (it->second->empty() == false) {host = it->second->chooseHost();return true;}}}// 缓存未命中,发起服务发现请求// 当前服务的提供者为空auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setServiceOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if (ret == false) {ELOG("服务发现失败!");return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if (service_rsp.get() == nullptr) {ELOG("服务发现时响应类型向下转换失败!");return false;}if (service_rsp->rcode() != RCode::RCODE_OK) {ELOG("服务发现失败!%s", errReason(service_rsp->rcode()).c_str());return false;}// 走到这里,代表当前服务没有对应主机提过服务的std::unique_lock<std::mutex> lock(_mutex);auto method_host = std::make_shared<MethodHost>(service_rsp->Hosts());if (method_host->empty()) {ELOG("%s 服务发现时,服务提供者列表为空!", method.c_str());return false;}DLOG("服务发现成功");host = method_host->chooseHost();_method_hosts[method] = method_host;return true;}// 提供给 Dispatcher 模块进行服务上线下线请求处理的回调函数// 处理服务端推送的上下线通知。// 设计目标:动态更新本地服务列表,确保客户端始终获取最新服务地址void onServiceRequest(const BaseConnection::ptr& conn,const ServiceRequest::ptr& msg) {// 1. 判断是上线还是下线请求,如果都不是那就不用处理了ServiceOptype optype = msg->Serviceoptype();std::string method = msg->method();std::unique_lock<std::mutex> lock(_mutex);// 2. 上线请求:找到MethodHost,向其中新增一个主机地址if (optype == ServiceOptype::SERVICE_ONLINE) {auto it = _method_hosts.find(method);if (it == _method_hosts.end()) {MethodHost::ptr method_host = std::shared_ptr<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method] = method_host;} else {it->second->appendHost(msg->host());}}// 3. 下线请求:找到MethodHost,从其中删除一个主机地址else if (optype == ServiceOptype::SERVICE_OFFLINE) {auto it = _method_hosts.find(method);if (it == _method_hosts.end())return;it->second->removerHost(msg->host());_offline_callback(msg->host());}}private:OfflineCallback _offline_callback;std::mutex _mutex;std::unordered_map<std::string, MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;
};

🏳️‍🌈二、rpc_topic 主题模块

先说说 客户端服务端 的主题模块的区别

  • ​客户端:负责发起主题操作请求,处理服务端推送的消息,​不维护全局状态。
  • ​服务端:维护全局主题和订阅者关系,处理客户端请求并广播消息,​不涉及业务逻辑。
  • 两者协同工作,客户端通过请求-响应模型与服务端交互,服务端确保消息的正确路由和广播。
    在这里插入图片描述

2.1 逻辑框架

#pragma once
#include "requestor.hpp"
#include <unordered_set>namespace rpc
{namespace client{class TopicManager{public:using SubCallback = std::function<void(const std::string &key, const std::string &msg)>;using ptr = std::shared_ptr<TopicManager>;TopicManager(const Requestor::ptr &requestor) : _requestor(requestor) {}// 创建主题bool createTopic(const BaseConnection::ptr &_conn, const std::string &key);// 取消主题bool removeTopic(const BaseConnection::ptr &_conn, const std::string &key);// 订阅主题bool subscribeTopic(const BaseConnection::ptr &conn, const std::string &key, const SubCallback &cb);// 取消订阅主题的订阅者bool cancelTopic(const BaseConnection::ptr &conn, const std::string &key);// 发布消息bool publishTopic(const BaseConnection::ptr &conn, const std::string &key, const std::string &msg);// 处理服务器返回的主题消息void onPublishTopic(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);private:void addSubscriber(const std::string &key, const SubCallback &cb);void delSubscriber(const std::string &key);const SubCallback getSubcallback(const std::string &key);bool commonRequest(const BaseConnection::ptr &conn, const std::string &key, TopicOptype type, const std::string &msg = " ");private:std::mutex _mutex;std::unordered_map<std::string, SubCallback> _topic_callbacks; // 主题名 -> 回调函数Requestor::ptr _requestor;};}
}

2.2 核心功能

client::TopicManager 是 RPC 客户端用于管理 ​主题(Topic)生命周期和消息订阅/发布 的核心类,主要功能包括:

  • 主题管理:创建、删除主题。
  • 订阅管理:订阅/取消订阅主题,绑定消息回调。
  • 消息发布:向指定主题发送消息。
  • 消息处理:接收服务端推送的主题消息,触发本地回调。
    在这里插入图片描述

2.3 接口方法详解

2.3.1 主题管理

在这里插入图片描述

// 创建主题
bool createTopic(const BaseConnection::ptr& _conn, const std::string& key) {return commonRequest(_conn, key, TopicOptype::TOPIC_CREATE);
}// 删除主题
bool removeTopic(const BaseConnection::ptr& _conn, const std::string& key) {return commonRequest(_conn, key, TopicOptype::TOPIC_REMOVE);
}
// 创建主题 "news"
topic_mgr->createTopic(conn, "news");
// 删除主题 "news"
topic_mgr->removeTopic(conn, "news");

2.3.2 订阅管理

在这里插入图片描述

// 订阅主题
bool subscribeTopic(const BaseConnection::ptr& conn, const std::string& key,const SubCallback& cb) {addSubscriber(key, cb);return commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);
}// 取消订阅主题的订阅者
bool cancelTopic(const BaseConnection::ptr& conn, const std::string& key) {delSubscriber(key);return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);
}
// 订阅主题 "news",定义回调函数
topic_mgr->subscribeTopic(conn, "news", [](const std::string& key, const std::string& msg) {std::cout << "收到消息: " << msg << std::endl;
});
// 取消订阅 "news"
topic_mgr->cancelTopic(conn, "news");

2.3.3 消息发布

在这里插入图片描述

// 发布消息
bool publishTopic(const BaseConnection::ptr& conn, const std::string& key,const std::string& msg) {return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);
}
// 发布消息到主题 "news"
topic_mgr->publishTopic(conn, "news", "Breaking: RPC框架发布!");

2.3.4 消息处理

在这里插入图片描述
内部流程

  1. ​验证操作类型:确保消息类型为 TOPIC_PUBLISH
  2. 提取消息内容:获取主题名 topic_key 和消息内容 topic_msg
  3. 查找回调函数:通过 getSubcallback 查找本地注册的回调。
  4. 触发回调:执行回调函数处理消息。
// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn,const TopicRequest::ptr& msg) {// 1. 从消息中取出操作类型进行判断,是否时消息请求auto type = msg->optype(); // 获取主题操作类型if (type != TopicOptype::TOPIC_PUBLISH) {ELOG("错误的主题操作类型:%d", static_cast<int>(type));return;}// 2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错auto callback = getSubcallback(topic_key);if (!callback) {ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())return;}return callback(topic_key, topic_msg);
}

2.4 核心逻辑:commonRequest 方法

所有主题操作请求通过 commonRequest 统一处理,流程如下

2.4.1 ​构造请求:

auto msg_rsp = MessageFactory::create<TopicRequest>();
msg_rsp->setTopicKey(key);
msg_rsp->setOptype(type); // 设置操作类型(如 TOPIC_CREATE)

2.4.2 发送请求

bool ret = _requestor->send(conn, msg_rsp, msg_rep);

2.4.3 处理响应:

  • 检查响应类型是否为 TopicResponse
  • 验证状态码 rcode() 是否成功。

2.4.4 实现代码

// 处理服务器返回的主题消息
void onPublishTopic(const BaseConnection::ptr& conn,const TopicRequest::ptr& msg) {// 1. 从消息中取出操作类型进行判断,是否时消息请求auto type = msg->optype(); // 获取主题操作类型if (type != TopicOptype::TOPIC_PUBLISH) {ELOG("错误的主题操作类型:%d", static_cast<int>(type));return;}// 2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();bool commonRequest(const BaseConnection::ptr& conn, const std::string& key,TopicOptype type, const std::string& msg = " ") {// 1. 构造请求对象,并填充数据auto msg_rsp = MessageFactory::create<TopicRequest>();msg_rsp->setId(UUID::uuid());msg_rsp->setMType(MType::REQ_TOPIC);msg_rsp->setOptype(type);msg_rsp->setTopicKey(key);if (type == TopicOptype::TOPIC_PUBLISH)msg_rsp->setTopicMsg(msg);// 2. 向 服务端 发送 同步 请求,等待响应BaseMessage::ptr msg_rep;bool ret = _requestor->send(conn, msg_rsp, msg_rep);if (ret == false) {ELOG("主题操作请求失败!");return false;}// 3. 获取服务端响应信息,判断请求处理是否成功auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rep);if (!topic_rsp_msg) {ELOG("主题操作响应,但向下转换时失败");return false;}if (topic_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("主题操作请求出错:%s",errReason(topic_rsp_msg->rcode()).c_str());return false;}ELOG("主题操作请求成功");return true;}// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错auto callback = getSubcallback(topic_key);if (!callback) {ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())return;}return callback(topic_key, topic_msg);
}

2.5 整体代码

#pragma once
#include "requestor.hpp"
#include <unordered_set>namespace rpc{namespace client{class TopicManager{public:using SubCallback = std::function<void(const std::string& key, const std::string& msg)>;using ptr = std::shared_ptr<TopicManager>;TopicManager(const Requestor::ptr& requestor) : _requestor(requestor) {}// 创建主题bool createTopic(const BaseConnection::ptr& _conn, const std::string& key){return commonRequest(_conn, key, TopicOptype::TOPIC_CREATE);}// 删除主题bool removeTopic(const BaseConnection::ptr& _conn, const std::string& key){return commonRequest(_conn, key, TopicOptype::TOPIC_REMOVE);}// 订阅主题bool subscribeTopic(const BaseConnection::ptr& conn, const std::string& key, const SubCallback& cb){addSubscriber(key, cb);return commonRequest(conn, key, TopicOptype::TOPIC_SUBSCRIBE);}// 取消订阅主题的订阅者bool cancelTopic(const BaseConnection::ptr& conn, const std::string& key){delSubscriber(key);return commonRequest(conn, key, TopicOptype::TOPIC_CANCEL);}// 发布消息bool publishTopic(const BaseConnection::ptr& conn, const std::string& key, const std::string& msg){return commonRequest(conn, key, TopicOptype::TOPIC_PUBLISH, msg);}// 处理服务器返回的主题消息void onPublishTopic(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){// 1. 从消息中取出操作类型进行判断,是否时消息请求auto type = msg->optype();  // 获取主题操作类型if(type != TopicOptype::TOPIC_PUBLISH){ELOG("错误的主题操作类型:%d", static_cast<int>(type));return;}// 2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();// 3. 通过主题名称,查找队形主题的回调处理函数,有 再处理,没有 就报错auto callback = getSubcallback(topic_key);if(!callback){ELOG("收到了 %s 主题消息,但是该消息主题无处理回调", topic_key.c_str())return;}return callback(topic_key, topic_msg);}private:void addSubscriber(const std::string& key, const SubCallback& cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key, cb));}void delSubscriber(const std::string& key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback getSubcallback(const std::string& key){std::unique_lock<std::mutex> _lock(_mutex);auto it = _topic_callbacks.find(key);if(it == _topic_callbacks.end()){ELOG("主题 %s 没有订阅者", key.c_str());return SubCallback();}return it->second;}bool commonRequest(const BaseConnection::ptr& conn, const std::string& key, TopicOptype type, const std::string& msg = " "){// 1. 构造请求对象,并填充数据auto msg_rsp = MessageFactory::create<TopicRequest>();msg_rsp->setId(UUID::uuid());msg_rsp->setMType(MType::REQ_TOPIC);msg_rsp->setOptype(type);msg_rsp->setTopicKey(key);if(type == TopicOptype::TOPIC_PUBLISH)msg_rsp->setTopicMsg(msg);// 2. 向 服务端 发送 同步 请求,等待响应BaseMessage::ptr msg_rep;bool ret = _requestor->send(conn, msg_rsp, msg_rep);if(ret == false){ELOG("主题操作请求失败!");return false;}// 3. 获取服务端响应信息,判断请求处理是否成功auto topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rep);if(!topic_rsp_msg){ELOG("主题操作响应,但向下转换时失败");return false;}if(topic_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("主题操作请求出错:%s", errReason(topic_rsp_msg->rcode()).c_str());return false;}ELOG("主题操作请求成功");return true;}private:std::mutex _mutex;std::unordered_map<std::string, SubCallback> _topic_callbacks;  // 主题名 -> 回调函数Requestor::ptr _requestor;};}
}

🏳️‍🌈三、rpc_client 客户端整合

3.1 整体架构

这段代码实现了一个 ​分布式RPC客户端框架,包含四个核心类:

  • RegistryClient(服务注册)
  • DiscoveryClient(服务发现)
  • RpcClient(RPC调用)
  • TopicClient(主题发布/订阅)

它们通过 Dispatcher(消息分发器)和 Requestor(请求处理器)协作,实现服务注册、发现、调用及消息推送功能

graph TDA[RegistryClient] -->|注册服务| B(注册中心)C[DiscoveryClient] -->|发现服务| BD[RpcClient] -->|调用服务| E(服务提供者)F[TopicClient] -->|发布/订阅| G(消息中心)

3.2 核心类详解

3.2.1 RegistryClient(服务注册客户端)​

​功能:将本地服务方法注册到远程注册中心。
​成员变量

  • _requestor:管理请求生命周期(超时、重试)。
  • _provider:封装服务注册协议,构造 SERVICE_REGISTRY 请求。
  • _dispatcher:分发注册响应消息。
  • _client:连接注册中心的网络客户端。

关键接口

  • registryMethod(method, host):注册方法 method 到地址 host。
// 示例:注册服务方法
RegistryClient client("127.0.0.1", 8080);
client.registryMethod("add", Address("192.168.1.100", 9090));
// 服务注册客户端,用于将本地服务方法注册到远程注册中心
class RegistryClient {
public:using ptr = std::shared_ptr<RegistryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string& ip, int port): _requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()) {// 设置响应的回调函数auto rsp_cb =std::bind(&client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);// 注册 rpc 响应处理函数_dispatcher->registerHandler<BaseMessage>(fields::MType::RSP_SERVICE,rsp_cb);// std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>// message_cb = std::bind(&rpc::Dispatcher::onMessage,// _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);// 设置客户端连接注册中心的地址_client = rpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}// 向外提供的服务注册接口bool registryMethod(const std::string& method, const Address& host) {return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系client::Provider::ptr _provider; // 封装服务注册协议,构造 SERVICE_REGISTRY// 请求,验证响应状态码Dispatcher::ptr_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如处理注册响应)BaseClient::ptr_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};

3.2.2DiscoveryClient(服务发现客户端)​

​功能:动态发现服务地址,处理上下线通知。
​成员变量

  • _discoverer:维护本地服务地址缓存。
  • _dispatcher:处理服务发现响应和上下线通知。

关键接口

  • serviceDiscovery(method, host):查询 method 的可用地址。
// 示例:发现服务地址
DiscoveryClient client("127.0.0.1", 8080, [](const Address& host) {std::cout << "服务下线: " << host.first << std::endl;
});
Address addr;
client.serviceDiscovery("add", addr); // 获取 "add" 方法的地址
// 服务注册客户端,用于将本地服务方法注册到远程注册中心
class RegistryClient {
public:using ptr = std::shared_ptr<RegistryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string& ip, int port): _requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()) {// 设置响应的回调函数auto rsp_cb =std::bind(&client::Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);// 注册 rpc 响应处理函数_dispatcher->registerHandler<BaseMessage>(fields::MType::RSP_SERVICE,rsp_cb);// std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>// message_cb = std::bind(&rpc::Dispatcher::onMessage,// _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);// 设置客户端连接注册中心的地址_client = rpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}// 向外提供的服务注册接口bool registryMethod(const std::string& method, const Address& host) {return _provider->registryMethod(_client->class DiscoveryClient {public:using ptr = std::shared_ptr<DiscoveryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心// DiscoveryClient 是// ​服务发现客户端,用于从注册中心动态发现服务提供者的地址,并处理服务上下线通知// 连接注册中心:建立与注册中心的网络连接。// ​服务发现请求:主动查询指定服务方法的可用地址。// ​动态通知处理:监听服务上下线通知,更新本地缓存。// ​离线回调机制:当服务下线时,触发用户自定义逻辑。DiscoveryClient(const std::string& ip, int port,const Discoverer::OfflineCallback& cb): _requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),_dispatcher(std::make_shared<Dispatcher>()) {// 1. 注册响应处理回调(处理服务发现响应)auto rsp_cb =std::bind(&Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);// 2. 注册服务请求处理回调(处理服务上下线通知)auto req_cb =std::bind(&client::Discoverer::onServiceRequest,_discoverer.get(), std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, req_cb);// 3. 设置消息总回调入口auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_client = rpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);// 4. 连接注册中心_client->connect();}// 向外提供的服务发现接口bool serviceDiscovery(const std::string& method,Address& host) {return _discoverer->serviceDiscovery(_client->connection(),method, host);}private:Requestor::ptr_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系。client::Discoverer::ptr_discoverer; // 封装服务发现逻辑,维护本地服务地址缓存,处理上下线通知。Dispatcher::ptr_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如服务上下线通知)BaseClient::ptr_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应};connection(), method, host);}private:Requestor::ptr_requestor; // 管理请求-响应生命周期,处理超时、重试,维护请求与响应的映射关系client::Provider::ptr _provider; // 封装服务注册协议,构造 SERVICE_REGISTRY// 请求,验证响应状态码Dispatcher::ptr_dispatcher; // 根据消息类型(MType)分发消息到对应处理器(如处理注册响应)BaseClient::ptr_client; // 网络通信客户端,负责与注册中心建立连接、发送请求、接收响应
};

3.2.3RpcClient(RPC调用客户端)​

​功能:根据配置(直连或服务发现)调用远程方法。
​成员变量

  • _enableDiscovery:模式开关(直连/服务发现)。
  • _caller:执行 RPC 调用,封装请求和响应解析。
  • _rpc_clients:连接池(服务发现模式下动态管理)。

​关键接口

  • call(method, params, result):调用远程方法 method。
// 示例:调用远程方法
RpcClient client(true, "127.0.0.1", 8080); // 启用服务发现
Json::Value params, result;
params.append(1); params.append(1);
client.call("add", params, result); // 调用 "add(1,1)"
class RpcClient {
public:using ptr = std::shared_ptr<RpcClient>;// enableDiscovery --// 是否启用服务发现功能,也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址RpcClient(bool enableDiscovery, const std::string& ip, int port): _enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<client::RpcCaller>(_requestor)) {// 1. 注册响应处理回调(处理服务发现响应)auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);// 如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化discovery_client// 如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好// rpc_clientif (_enableDiscovery) {auto offline_cb =std::bind(&RpcClient::delclient, this, std::placeholders::_1);_discovery_client =std::make_shared<DiscoveryClient>(ip, port, offline_cb);} else {auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}bool call(const std::string& method, const Json::Value& params,Json::Value& result) {// 获取服务提供者: 1. 服务发现    2. 固定服务提供者BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {ELOG("没有找到相应的客户端信息");return false;}return _caller->call(client->connection(), method, params, result);}bool call(const std::string& method, const Json::Value& params,RpcCaller::JsonAsyncResponse& result) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}// 3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}bool call(const std::string& method, const Json::Value& params,const RpcCaller::JsonResponseCallback& cb) {BaseClient::ptr client = getClient(method);if (client.get() == nullptr) {return false;}// 3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, cb);}private:BaseClient::ptr newClient(const Address& host) {auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();putClient(host, client);return client;}// 获取客户端信息 - 通过地址信息BaseClient::ptr getClient(const Address& host) {std::unique_lock<std::mutex> lock(_mutex);auto it = _rpc_clients.find(host);if (it == _rpc_clients.end()) {ELOG("没有找到相应的客户端信息");return BaseClient::ptr();}return it->second;}// 获取客户端信息 - 通过服务方法BaseClient::ptr getClient(const std::string& method) {BaseClient::ptr client;if (_enableDiscovery) {// 1. 通过服务器发现,获取服务提供者地址信息Address host;bool ret = _discovery_client->serviceDiscovery(method, host);if (ret == false) {ELOG("当前 %s 服务,没有找到服务提供者!", method.c_str());return BaseClient::ptr();}// 2. 通过地址信息,获取客户端信息client = getClient(host);if (client.get() == nullptr) {ELOG("没有找到一实例化的客户端信息,则创建");client = newClient(host);}} else {client = _rpc_client;}return client;}// 添加客户端信息void putClient(const Address& host, BaseClient::ptr& client) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}// 删除客户端信息void delclient(const Address& host) {std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash {size_t operator()(const Address& host) const {std::string addr = host.first + std::to_string(host.second);return std::hash<std::string>{}(addr);}};private:bool _enableDiscovery;     // 模式开关:true 启用服务发现,false// 直连固定服务提供者Requestor::ptr _requestor; // 管理请求-响应生命周期,处理超时和重试。Dispatcher::ptr _dispatcher; // 分发消息到对应处理器(如 RPC 响应)RpcCaller::ptr _caller;      // 执行 RPC 调用,封装请求发送和响应解析逻辑DiscoveryClient::ptr_discovery_client;       // 服务发现客户端(仅在服务发现模式下有效)BaseClient::ptr _rpc_client; // 直连模式下的固定服务提供者连接。std::mutex _mutex;std::unordered_map<Address, BaseClient::ptr, AddressHash>_rpc_clients; // 用于服务发现的客户端连接池
};

3.2.4TopicClient(主题客户端)​

​功能:管理主题的创建、订阅、发布。
​成员变量

  • _topic_manager:处理主题操作(如订阅回调)。
  • _dispatcher:分发主题相关消息。

关键接口

  • subscribeTopic(key, callback):订阅主题 key 并绑定回调。
  • publishTopic(key, msg):向主题 key 发布消息 msg。
// 示例:发布/订阅主题
TopicClient client("127.0.0.1", 9090);
client.subscribeTopic("news", [](const std::string& key, const std::string& msg) {std::cout << "收到消息: " << msg << std::endl;
});
client.publishTopic("news", "RPC框架发布!");
class TopicClient {
public:using ptr = std::shared_ptr<TopicClient>;TopicClient(const std::string& ip, int port): _requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_topic_manager(std::make_shared<TopicManager>(_requestor)) {auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_TOPIC, rsp_cb);auto msg_cb =std::bind(&TopicManager::onPublishTopic, _topic_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, msg_cb);auto message_cb =std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_rpc_client = ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}bool createTopic(const std::string& key) {return _topic_manager->createTopic(_rpc_client->connection(), key);}bool removeTopic(const std::string& key) {return _topic_manager->removeTopic(_rpc_client->connection(), key);}bool subscribeTopic(const std::string& key,const TopicManager::SubCallback& cb) {return _topic_manager->subscribeTopic(_rpc_client->connection(), key,cb);}bool cancelTopic(const std::string& key) {return _topic_manager->cancelTopic(_rpc_client->connection(), key);}bool publishTopic(const std::string& key, const std::string& msg) {return _topic_manager->publishTopic(_rpc_client->connection(), key,msg);}void shutdown() { _rpc_client->shutdown(); }private:Requestor::ptr _requestor;TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;
};

3.3 协作逻辑与设计思路

3.3.1 消息分发(Dispatcher)​

​作用:根据消息类型(MType)路由到对应处理器。
​注册逻辑

// 注册处理函数(以 RpcClient 为例)
_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, [](auto conn, auto msg) {// 处理 RPC 响应
});

3.3.2 请求处理(Requestor)​

​作用:统一管理请求发送、超时重试、响应映射。
​流程

  • 发送请求时生成唯一ID,记录到等待队列。
  • 接收响应时根据ID匹配请求,触发回调。

3.3.3 服务发现与连接池(RpcClient)​

​动态连接管理

  • 启用服务发现时,通过 DiscoveryClient 获取地址,并维护连接池 _rpc_clients
  • 直连模式时固定连接 _rpc_client

​线程安全:使用 std::mutex 保护连接池操作。

3.3.4 主题管理(TopicManager)​

​回调机制:订阅时绑定回调函数到 _topic_callbacks
​消息推送:服务端广播消息后,客户端通过 onPublishTopic 触发本地回调。

🏳️‍🌈四、主题发布订阅流程示意

截至目前,我们已经封装好了 rpc 项目中所有的内容,现在我们来模拟一下 客户端A发布一个主题消息客户端B接收到这个主题消息 的全部流程

4.1 核心文件与类说明

在这里插入图片描述

4.2 步骤 1:客户端A创建主题

// 1. 创建 TopicManager
auto requestor = std::make_shared<Requestor>();
auto topic_mgr = std::make_shared<client::TopicManager>(requestor);// 2. 创建主题 "news"
BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
topic_mgr->createTopic(conn, "news");

逻辑

  1. TopicManager::createTopic 发送 TopicOptype::TOPIC_CREATE 请求到服务端。
  2. 服务端 server::TopicManager::onTopicRequest 处理请求,创建全局主题 "news"

4.3 步骤 2:客户端B订阅主题

// 1. 订阅主题 "news",定义回调函数
auto requestor = std::make_shared<Requestor>();
auto topic_mgr = std::make_shared<client::TopicManager>(requestor);BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");
topic_mgr->subscribeTopic(conn, "news", [](const std::string& key, const std::string& msg) {std::cout << "收到主题消息: " << msg << std::endl;
});

逻辑

  1. TopicManager::subscribeTopic 发送 TopicOptype::TOPIC_SUBSCRIBE 请求到服务端。
  2. 服务端记录客户端B为 "news" 的订阅者(存储到 server::Topic::_subscribers)。

4.4 步骤 3:客户端A发布消息

// 客户端A继续发布消息
topic_mgr->publishTopic(conn, "news", "Breaking: RPC框架发布!");

逻辑

  1. TopicManager::publishTopic 发送 TopicOptype::TOPIC_PUBLISH 请求,携带消息内容。
  2. 服务端 server::TopicManager::onTopicRequest 处理请求,广播消息给所有订阅者。

4.5 步骤 4:服务端广播消息

// 服务端代码(server/rpc_topic.hpp)
void server::TopicManager::topicPublish(...) {// 1. 查找主题 "news"auto topic = _topics["news"];// 2. 遍历所有订阅者(客户端B)for (auto& subscriber : topic->_subscribers) {// 3. 通过订阅者连接发送消息subscriber->_conn->send(msg);}
}

逻辑

  • 服务端通过 Topic::pushMessage 将消息推送到所有订阅者的连接(客户端B)。

4.6 步骤 5:客户端B接收消息

// 客户端B的 TopicManager 处理消息
void client::TopicManager::onPublishTopic(...) {// 1. 提取消息内容std::string topic_key = msg->topicKey();  // "news"std::string topic_msg = msg->topicMsg();  // "Breaking: RPC框架发布!"// 2. 触发订阅时定义的回调函数callback(topic_key, topic_msg);
}

输出

收到主题消息: Breaking: RPC框架发布!

4.7 关键类交互图

sequenceDiagramparticipant ClientA as 客户端Aparticipant Server as 服务端participant ClientB as 客户端BClientA->>Server: createTopic("news")Server-->>ClientA: 创建成功ClientB->>Server: subscribeTopic("news")Server-->>ClientB: 订阅成功ClientA->>Server: publishTopic("news", "Breaking: RPC框架发布!")Server->>ClientB: 推送消息ClientB->>ClientB: 触发回调函数

4.8 补充说明

  • ​线程安全
    客户端和服务端的 TopicManager 使用 std::mutex 保护共享数据(如订阅列表)。
  • ​错误处理
    若主题不存在,服务端返回 RCode::RCODE_NOT_FOUND_TOPIC,客户端记录错误日志。
  • ​网络通信
    所有请求通过 Requestor::send 发送,底层使用 BaseConnection 的 TCP 连接。

🏳️‍🌈五、运程调用add方法流程示意

为了方便各位更好地理解 rpc 框架运程调用的功能,这里模拟一下 客户端调用服务端注册好的add方法,来得到1 + 1的结果的整个流程

5.1 核心文件与类说明

在这里插入图片描述

5.2 步骤 1:服务端启动并注册 add 方法

// 文件: test_server.cpp
#include "rpc_server.hpp"
#include "rpc_registry.hpp"
#include "message.hpp"// 1. 定义服务端加法方法
int add(int a, int b) { return a + b; }int main() {// 2. 创建 RpcServer 并绑定端口RpcServer server("0.0.0.0:8080");// 3. 注册方法到 RpcRegistryserver.getRegistry()->registerMethod("add", &add);// 4. 启动服务端监听server.start();
}

​逻辑

  • 服务端通过 RpcRegistry::registerMethodadd 方法注册到全局注册表(rpc_registry.hpp)。
  • 注册时生成映射:“add” → 函数指针 &add。

5.3 步骤 2:客户端构造调用请求

// 文件: test_client.cpp
#include "rpc_caller.hpp"
#include "requestor.hpp"int main() {// 1. 创建 RpcCaller 工具RpcCaller caller;// 2. 生成调用 add(1,1) 的请求auto req = caller.prepareCall("add", 1, 1);// 3. 创建 Requestor 发送请求Requestor requestor;BaseConnection::ptr conn = connect_to_server("127.0.0.1:8080");auto rsp = requestor.send(conn, req);// 4. 解析响应结果if (rsp->rcode() == RCode::RCODE_OK) {int result = rsp->getResult<int>();std::cout << "1+1=" << result << std::endl; // 输出 1+1=2}
}

逻辑

  • RpcCaller::prepareCall(rpc_caller.hpp)生成 MethodRequest 消息,包含方法名 “add” 和参数 [1,1]。
  • Requestor::send(requestor.hpp)将请求序列化并通过 BaseConnection 发送到服务端。

5.4 步骤3:服务端接收请求并路由到 add 方法

// 文件: dispatcher.hpp
// Dispatcher 注册处理函数(服务端初始化时执行)
Dispatcher dispatcher;
dispatcher.registerHandler<MethodRequest>(MType::REQ_METHOD, [registry](const BaseConnection::ptr& conn, std::shared_ptr<MethodRequest>& req) {// 1. 从 RpcRegistry 查找方法 "add"auto method = registry->getMethod(req->method());// 2. 执行方法并获取结果int result = method->invoke<int>(req->params());// 3. 构造响应消息auto rsp = MessageFactory::create<MethodResponse>();rsp->setResult(result);rsp->setRcode(RCode::RCODE_OK);// 4. 发送响应conn->send(rsp);}
);

逻辑

  • Dispatcher(dispatcher.hpp)根据消息类型 MType::REQ_METHOD 路由到注册的 Lambda 处理函数。
  • RpcRegistry 中查找方法 “add”,调用并返回结果。

5.5 步骤4:网络层数据传输

// 文件: net.hpp
// 服务端接收请求(BaseConnection::onMessage)
void BaseConnection::onMessage(const Buffer& buf) {// 1. 反序列化为 MethodRequestauto msg = deserialize<MethodRequest>(buf);// 2. 通过 Dispatcher 分发消息Dispatcher::instance().onMessage(shared_from_this(), msg);
}

​逻辑

  • 服务端 BaseConnection 接收字节流,反序列化为 MethodRequest 对象。
  • 调用 Dispatcher::onMessage 触发多路转接。

5.6 关键类交互图

sequenceDiagramparticipant Client as 客户端participant Requestor as Requestor (requestor.hpp)participant Dispatcher as Dispatcher (dispatcher.hpp)participant Registry as RpcRegistry (rpc_registry.hpp)participant Server as RpcServer (rpc_server.hpp)Client->>Requestor: 调用 add(1,1)Requestor->>Client: 生成 MethodRequestClient->>Server: 发送 MethodRequest (TCP)Server->>Dispatcher: 接收消息并路由Dispatcher->>Registry: 查找方法 "add"Registry->>Dispatcher: 返回函数指针Dispatcher->>Server: 调用 add(1,1)Server->>Client: 返回 MethodResponse (TCP)Client->>Client: 解析结果为 2

5.7 补充说明

  1. 客户端通过 RpcCaller 生成 MethodRequest,通过 Requestor 发送请求。
  2. 服务端通过 Dispatcher 多路转接,将请求路由到 RpcRegistry 中注册的 add 方法。
  3. 网络层通过 BaseConnection 实现 TCP 通信,Dispatcher 确保消息正确路由。
  4. 结果返回通过 MethodResponse 序列化回客户端,完成一次完整的 RPC 调用。

👥总结

本篇博文对 【从零实现Json-Rpc框架】- 项目实现 - 客户端注册主题及整合 做了一个较为详细的介绍,不知道对你有没有帮助呢

觉得博主写得还不错的三连支持下吧!会继续努力的~

请添加图片描述

关键字:怎样下载软件到电脑桌面上_电商平台如何引流推广_ip切换工具_信息推广

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: