当前位置: 首页> 汽车> 时评 > 织梦论坛源码_莱芜论坛莱芜在线_sem竞价托管费用_代写文章质量高的平台

织梦论坛源码_莱芜论坛莱芜在线_sem竞价托管费用_代写文章质量高的平台

时间:2025/7/9 11:07:45来源:https://blog.csdn.net/weixin_50776420/article/details/146703639 浏览次数: 0次
织梦论坛源码_莱芜论坛莱芜在线_sem竞价托管费用_代写文章质量高的平台

🌈 个人主页:Zfox_
🔥 系列专栏:C++从入门到精通

目录

  • 一:🔥 服务端 - RpcRouter 实现
  • 二:🔥 服务端 - Publish & Subscribe 实现
  • 三:🔥 服务端 - Registry & Discovery实现
  • 四:🔥 服务端 - 整合封装 Server
  • 五:🔥 客户端 - Requestor 实现
  • 六:🔥 客户端 - RpcCaller实现
  • 七:🔥 客户端 - Publish & Subscribe实现
  • 八:🔥 客户端 - Registry & Discovery实现
  • 九:🔥 客户端 - 整合封装 Client
  • 十:🔥 整合封装的使⽤代码样例
    • 🦋 简单 Rpc 使⽤
    • 🦋 基于服务注册发现的 Rpc 调⽤
    • 🦋 基于⼴播的发布订阅
  • 十一:🔥 共勉

一:🔥 服务端 - RpcRouter 实现

rpc_router.hpp

  • 提供 Rpc 请求处理回调函数
  • 内部的服务管理
    • ⽅法名称
    • 参数信息
    • 对外提供参数校验接⼝
#pragma once#include "../common/net.hpp"
#include "../common/message.hpp"namespace rpc
{namespace server{enum class VType{BOOL = 0,INTEGRAL,NUMERIC,SIRING,ARRAY,OBJECT};class ServiceDescribe{public:using ptr = std::shared_ptr<ServiceDescribe>;using ServiceCallback = std::function<void(const Json::Value& , Json::Value &)>;using ParamsDescirbe = std::pair<std::string, VType>;// 由于 mname 和 desc 是 const 的,你实际上不能对它们进行移动操作,因为移动语义通常会修改源对象的状态。因此,这里的 const 是有问题的。ServiceDescribe(std::string &&mname, std::vector<ParamsDescirbe> &&desc, VType vtype, ServiceCallback &&handler): _method_name(std::move(mname)),_params_desc(std::move(desc)),_return_type(vtype),_callback(std::move(handler)){}const std::string& method(){ return _method_name; }// 针对收到请求中的参数进行校验bool paramCheck(const Json::Value &params){// 对params进行参数校验 --- 判断所描述的参数字段是否存在,类型是否一致for(auto &desc : _params_desc){if(params.isMember(desc.first) == false) {LOG(LogLevel::ERROR) << "参数字段完整性校验失败! " << desc.first << " 字段缺失";return false;}if(check(desc.second, params[desc.first]) == false) {LOG(LogLevel::ERROR) << desc.first << " 参数类型校验失败! ";return false;}}return true;}bool call(const Json::Value &params, Json::Value &result){_callback(params, result);if(rtypecheck(result) == false) {LOG(LogLevel::ERROR) << "回调处理函数中的响应信息校验失败! ";return false;}return true;}private:bool rtypecheck(const Json::Value &val) {return check(_return_type, val); }bool check(VType vtype, const Json::Value &val) {switch (vtype){case VType::BOOL : return val.isBool();case VType::INTEGRAL : return val.isIntegral();case VType::NUMERIC : return val.isNumeric();case VType::SIRING : return val.isString();case VType::ARRAY : return val.isArray();case VType::OBJECT : return val.isObject();}return false;}private:std::string _method_name;                    // 方法名称ServiceCallback _callback;                   // 实际业务的回调函数std::vector<ParamsDescirbe> _params_desc;    // 参数字段格式描述VType _return_type;                          // 结果作为返回值类型的描述};class SDescribeFactory{public:void setMethodName(const std::string &name) {_method_name = name;}void setReturnType(VType vtype) {_return_type = vtype;}void setParamsDesc(const std::string &pname, VType vtype){_params_desc.push_back(ServiceDescribe::ParamsDescirbe(pname, vtype));}void setCallback(const ServiceDescribe::ServiceCallback &cb){_callback = cb;}ServiceDescribe::ptr build() {return std::make_shared<ServiceDescribe>(std::move(_method_name), std::move(_params_desc), _return_type, std::move(_callback));}private:std::string _method_name;ServiceDescribe::ServiceCallback _callback;                     // 实际业务的回调函数std::vector<ServiceDescribe::ParamsDescirbe> _params_desc;      // 参数字段格式描述VType _return_type;                                             // 结果作为返回值类型的描述};class ServiceManager{public:using ptr = std::shared_ptr<ServiceManager>;void insert(const ServiceDescribe::ptr &desc){   std::unique_lock<std::mutex> lock(_mutex);_services.insert(std::make_pair(desc->method(), desc));}ServiceDescribe::ptr select(const std::string &method_name){std::unique_lock<std::mutex> lock(_mutex);auto it = _services.find(method_name);if(it == _services.end()) {return ServiceDescribe::ptr();}return it->second;}void remove(const std::string &method_name){std::unique_lock<std::mutex> lock(_mutex);if(_services.find(method_name) != _services.end()){_services.erase(method_name);}}private:std::mutex _mutex;std::unordered_map<std::string, ServiceDescribe::ptr> _services;};class RpcRouter{public:using ptr = std::shared_ptr<RpcRouter>;RpcRouter() : _service_manager(std::make_shared<ServiceManager>()){}// 这是注册到Dispatcher模块针对rpc请求进行回调函数的业务处理void onRpcRequest(const BaseConnection::ptr &conn, RpcRequest::ptr &request){// 1. 查询客户端请求的方法描述 --- 判断当前服务端是否能提供对应的服务ServiceDescribe::ptr service = _service_manager->select(request->method());if(service.get() == nullptr) {LOG(LogLevel::INFO) << request->method().c_str() << " 服务未找到! ";return response(conn, request, Json::Value(), RCode::RCODE_NOT_FOUND_SERVICE);}// 2. 进行参数校验,确定能否提供服务if(service->paramCheck(request->params()) == false) {LOG(LogLevel::INFO) << request->method().c_str() << " 服务参数校验失败! ";return response(conn, request, Json::Value(), RCode::RCODE_INVALID_PARAMS);}// 3. 调用业务回调接口进行业务处理Json::Value result;bool ret = service->call(request->params(), result);if(ret == false) {LOG(LogLevel::INFO) << request->method().c_str() << " 服务内部错误! ";return response(conn, request, Json::Value(), RCode::RCODE_INTERNAL_ERROR);}// 4. 处理完毕得到结果,组织响应,向客户端发送return response(conn, request, result, RCode::RCODE_OK);}void registerMethod(const ServiceDescribe::ptr &service){return _service_manager->insert(service);}private:void response(const BaseConnection::ptr &conn, const RpcRequest::ptr &req, const Json::Value &res, RCode rcode){auto msg = MessageFactory::create<RpcResponse>();msg->setId(req->rid());msg->setMType(rpc::MType::RSP_RPC);msg->setRCode(rcode);msg->setResult(res);conn->send(msg);}private:ServiceManager::ptr _service_manager;};}
}

二:🔥 服务端 - Publish & Subscribe 实现

  • 对外提供主题操作处理回调函数
  • 对外提供消息发布处理回调函数
  • 内部进⾏主题及订阅者的管理
#pragma once#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>namespace rpc
{namespace server{class TopicManager{public:using ptr = std::shared_ptr<TopicManager>;TopicManager() {}void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){bool ret = true;switch (msg->optype()){// 主题的创建case TopicOptype::TOPIC_CREATE : topicCreate(conn, msg); break;// 主题的删除case TopicOptype::TOPIC_REMOVE : topicRemove(conn, msg); break;// 主题的订阅case TopicOptype::TOPIC_SUBSCRIBE : ret = topicSubscribe(conn, msg); break;// 主题的取消订阅case TopicOptype::TOPIC_CANCEL : topicCancle(conn, msg); break;// 主题消息的发布case TopicOptype::TOPIC_PUBLISH : ret = topicPublish(conn, msg); break;default : return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE);}if(!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn, msg);}// 订阅者在连接断开时的处理 --- 删除其关联的数据void onShutdown(const BaseConnection::ptr &conn){// 消息发布者断开连接,不需要任何操作   消息订阅者断开连接需要删除管理数据// 1. 判断断开连接的是否是订阅者,不是的话直接返回 std::vector<Topic::ptr> topics;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto it = _subscribers.find(conn);if(it == _subscribers.end()) {return ;}subscriber = it->second;// 2. 获取订阅者退出受影响的主题对象for(auto &topic_name : subscriber->topics) {auto topic_it = _topics.find(topic_name);if(topic_it == _topics.end()) continue;topics.push_back(topic_it->second);}// 3. 从订阅者信息当中,删除订阅者_subscribers.erase(it);}// 4. 从受影响的主题对象中移除订阅者for(auto &topic : topics){topic->removeSubscriber(subscriber);}}private:void errorResponse(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg, RCode rcode){auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRCode(rcode);conn->send(msg_rsp);}void topicResponse(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRCode(RCode::RCODE_OK);conn->send(msg_rsp);}void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);// 构造一个主题对象,添加映射关系的管理std::string topic_name = msg->topicKey();auto topic = std::make_shared<Topic>(topic_name);_topics.insert(std::make_pair(topic_name, topic));}void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1. 查看当前主题,有哪些订阅者,然后从订阅者中将主题信息删除// 2. 删除主题的数据 -- 主题名称与主题对象的映射关系std::string topic_name = msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers;{std::unique_lock<std::mutex> lock(_mutex);// 在删除主题之前,先找出会受到影响的订阅者auto it = _topics.find(topic_name);if(it == _topics.end()) {return ;}subscribers = it->second->subscribers;_topics.erase(it);  // 删除当前的主题映射关系}for(auto &subscriber : subscribers) {subscriber->removeTopic(topic_name);}}bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1. 先找出主题对象,以及订阅者对象//    如果没有找到主题 -- 就要报错  如果没有找到订阅者对象,那就要构造一个订阅者std::string topic_name = msg->topicKey();Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(topic_name);if(topic_it == _topics.end()) {return false;}topic = topic_it->second;auto sub_it = _subscribers.find(conn);if(sub_it != _subscribers.end()) {subscriber = sub_it->second;} else {subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}}// 2. 在主题对象中,新增一个订阅者对象关联的连接;  在订阅者对象中新增一个订阅的主题topic->appendSubscriber(subscriber);subscriber->appendTopic(topic_name);return true;}void topicCancle(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1. 先找出主题对象 和订阅者对象//    主题不存在就报错,订阅者不存在则返回// 2. 从主题对象中删除订阅者连接    从订阅者信息中删除我们所订阅的主题名称std::string topic_name = msg->topicKey();Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(topic_name);if(topic_it != _topics.end()) {topic = topic_it->second;}auto sub_it = _subscribers.find(conn);if(sub_it != _subscribers.end()) {subscriber = sub_it->second;}}if(subscriber.get()) subscriber->removeTopic(topic_name);if(topic.get() && subscriber.get()) topic->removeSubscriber(subscriber);}bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){std::string topic_name = msg->topicKey();Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(topic_name);if(topic_it == _topics.end()) {return false;}topic = topic_it->second;}topic->pushMessage(msg);return true;}private:struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr conn;std::unordered_set<std::string> topics;               // 订阅者所订阅的主题名称Subscriber(const BaseConnection::ptr &c) : conn(c){}// 订阅主题的时候调用void appendTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.insert(topic_name);}// 主题被删除 或者 取消订阅的时候,调用void removeTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.erase(topic_name);}};struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers;     // 当前主题的订阅者Topic(const std::string &name) : topic_name(name){}// 新增订阅的时候调用void appendSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.insert(subscriber);}// 取消订阅 或者 订阅者连接断开 调用void removeSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.erase(subscriber);}// 收到消息发布请求的时候调用void pushMessage(const BaseMessage::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);for(auto &subscriber : subscribers) {subscriber->conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};}
}

三:🔥 服务端 - Registry & Discovery实现

  • 对外提供服务操作(注册/发现)消息处理回调函数
  • 内部进⾏服务发现者的管理
  • 内部进⾏服务提供者的管理
#pragma once#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set>namespace rpc
{namespace server{class ProviderManager{public:using ptr = std::shared_ptr<ProviderManager>;struct Provider{using ptr = std::shared_ptr<Provider>;std::mutex _mutex;BaseConnection::ptr conn;Address host;std::vector<std::string> methods;Provider(const BaseConnection::ptr &c, const Address &h) : conn(c), host(h){}void appendMethod(const std::string &method){std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};// 当一个新的服务提供者进行服务注册的时候调用void addProvider(const BaseConnection::ptr &c, const Address &h, const std::string &method){Provider::ptr provider;// 查找连接所关联的服务提供者对象,找到则获取,找不到则创建,并建立关联{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {provider = it->second;} else {provider = std::make_shared<Provider>(c, h);_conns.insert(std::make_pair(c, provider));}// method方法的提供主机 _providers要新增信息auto &providers = _providers[method];       // 有的话直接获取 没有的话也直接创建了providers.insert(provider);}// 向服务对象中新增一个所能提供的服务名称provider->appendMethod(method);}// 当一个服务提供者断开连接的时候,获取它的信息 -- 用于进行服务下线通知Provider::ptr getProvider(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it != _conns.end()) {return it->second;}return Provider::ptr();}// 当一个服务提供者断开连接的时候,删除它的关联信息void delProvider(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if (it == _conns.end()) {// 当前断开连接的不是一个服务提供者return ;}// 如果是提供者 看看提供了什么服务 从服务者提供信息中 删除当前服务提供者for(auto &method : it->second->methods) {auto &provider = _providers[method];provider.erase(it->second);}// 删除连接与服务提供者的关联关系_conns.erase(it);}std::vector<Address> methodHosts(const std::string &method){std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if(it == _providers.end()) {return std::vector<Address>();}std::vector<Address> result;for(auto& providers : it->second) {result.push_back(providers->host);}return result;}private:std::mutex _mutex;std::unordered_map<std::string, std::set<Provider::ptr>> _providers;    // 方法的提供者    用于服务发现std::unordered_map<BaseConnection::ptr, Provider::ptr> _conns;          // 连接对应的方法  用于服务下线};class DiscovererManager{public:using ptr = std::shared_ptr<DiscovererManager>;struct Discoverer{using ptr = std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn;         // 发现者关联的客户端连接std::vector<std::string> methods; // 发现过的服务名称Discoverer(const BaseConnection::ptr &c) : conn(c){}void appendMethod(const std::string &method) {std::unique_lock<std::mutex> lock(_mutex);methods.push_back(method);}};// 当每次客户端进行服务发现的时候新增发现者, 新增服务名称Discoverer::ptr addDiscoverer(const BaseConnection::ptr &c, const std::string &method) {Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if(it != _conns.end()) {discoverer = _conns[c];}else {discoverer = std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c, discoverer));}auto &discoverers = _discoverers[method];       // 有的话直接获取 没有的话也直接创建了discoverers.insert(discoverer);}discoverer->appendMethod(method);return discoverer;}// 发现者客户端断开连接时,找到发现者信息,删除关联数据void delDiscoverer(const BaseConnection::ptr &c) {std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(c);if(it == _conns.end()) {// 没有找到连接对应的发现者信息,代表客户端不是一个服务发现者return ;}for(auto &method : it->second->methods) {auto& discovers = _discoverers[method];discovers.erase(it->second);}_conns.erase(it);}// 当有一个新的服务提供者上线,则进行上线通知void onlineNotify(const std::string &method, const Address &host) {return notify(method, host, ServiceOptype::SERVICE_ONLINE);}// 当有一个服务提供者断开连接,则进行下线通知void offlineNotify(const std::string &method, const Address &host) {return notify(method, host, ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string &method, const Address &host, ServiceOptype optype){std::unique_lock<std::mutex> lock(_mutex);auto it = _discoverers.find(method);if(it == _discoverers.end()) {// 代表这个服务当前没有发现者return ;}auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setOptype(optype);for(auto &discoverer : it->second) {discoverer->conn->send(msg_req);}}private:std::mutex _mutex;std::unordered_map<std::string, std::set<Discoverer::ptr>> _discoverers; // 哪些服务被哪些主机发现过std::unordered_map<BaseConnection::ptr, Discoverer::ptr> _conns;          // 连接对应的发现者};class PDManager{public:using ptr = std::shared_ptr<PDManager>;PDManager() : _providers(std::make_shared<ProviderManager>()),_discoverers(std::make_shared<DiscovererManager>()){}// 提供给dispatcher的服务注册/发现回调函数void onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr msg){// 服务操作请求:服务注册 服务发现ServiceOptype optype = msg->optype();if(optype == ServiceOptype::SERVICE_REGISTRY) {// 服务注册: // 1. 新增服务提供者    2. 进行服务上线通知LOG(LogLevel::INFO) << msg->host().first << ":" << msg->host().second << " 注册服务 " << msg->method();_providers->addProvider(conn, msg->host(), msg->method());_discoverers->onlineNotify(msg->method(), msg->host());return registryResponse(conn, msg);}else if(optype == ServiceOptype::SERVICE_DISCOVERY) {// 服务发现// 1. 新增服务发现者LOG(LogLevel::INFO) << " 客户端要进行 " << msg->method() << " 服务发现 ";_discoverers->addDiscoverer(conn, msg->method());// 应该要通知发现者现在上线的服务 return discoveryResponse(conn, msg); }else {LOG(LogLevel::ERROR) << "收到服务操作请求,但是操作类型错误! ";return errorResponse(conn, msg);}}void onConnShutdown(const BaseConnection::ptr &conn) {auto provider = _providers->getProvider(conn);if(provider.get() != nullptr) {LOG(LogLevel::INFO) << provider->host.first << ":" << provider->host.second << " 服务下线 ";for(auto &method : provider->methods) {_discoverers->offlineNotify(method, provider->host);}_providers->delProvider(conn);return ;}_discoverers->delDiscoverer(conn);}private:void errorResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg){auto msg_rsp = MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setRCode(RCode::RCODE_INVALID_OPTYPE);msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);conn->send(msg_rsp);}void registryResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg){auto msg_rsp = MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setRCode(RCode::RCODE_OK);msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);conn->send(msg_rsp);}void discoveryResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg){auto msg_rsp = MessageFactory::create<ServiceResponse>();std::vector<Address> hosts = _providers->methodHosts(msg->method());msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_SERVICE);msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);if(hosts.empty()) {msg_rsp->setRCode(RCode::RCODE_NOT_FOUND_SERVICE);return conn->send(msg_rsp);}msg_rsp->setRCode(RCode::RCODE_OK);msg_rsp->setMethod(msg->method());msg_rsp->setHost(std::move(hosts));conn->send(msg_rsp);}private:ProviderManager::ptr _providers;DiscovererManager::ptr _discoverers;};}
}

四:🔥 服务端 - 整合封装 Server

rpc_server.hpp

#pragma once#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_router.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"namespace rpc
{namespace server{// 注册中心服务器: 只需要针对服务器注册与发现请求处理即可 class RegistryServer{public:using ptr = std::shared_ptr<RegistryServer>;RegistryServer(int port): _pd_manager(std::make_shared<PDManager>()),_dispatcher(std::make_shared<Dispatcher>()){auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);_server = ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb = std::bind(&RegistryServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr &conn) {_pd_manager->onConnShutdown(conn);}private:PDManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class Rpcserver{public:using ptr = std::shared_ptr<Rpcserver>;// rpcserver端有两套地址信息// 1. rpc服务提供端地址信息 -- 必须是rpc服务器对外访问的地址信息// 2. 注册中心服务端地址信息 -- 连接服务注册中心Rpcserver(const Address &access_addr, bool enableRegistry = false, const Address &registry_server_addr = Address()) : _enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<RpcRouter>()),_dispatcher(std::make_shared<Dispatcher>()){if(enableRegistry) {_reg_client = std::make_shared<client::RegistryClient>(registry_server_addr.first, registry_server_addr.second);}// 当前成员server是一个rpcserver,用于提供rpc服务的auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, rpc_cb);_server = ServerFactory::create(_access_addr.second);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}void registerMethod(const ServiceDescribe::ptr &service){if(_enableRegistry) {// 向服务中心注册_reg_client->registryMethod(service->method(), _access_addr);}_router->registerMethod(service);}void start(){_server->start();}private:bool _enableRegistry;Address _access_addr;Address _registry_server_addr;client::RegistryClient::ptr _reg_client;RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};// Topic服务器class TopicServer{public:using ptr = std::shared_ptr<TopicServer>;TopicServer(int port): _topic_manager(std::make_shared<TopicManager>()),_dispatcher(std::make_shared<Dispatcher>()){auto topic_cb = std::bind(&TopicManager::onTopicRequest, _topic_manager.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, topic_cb);_server = ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb = std::bind(&TopicServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr &conn) {_topic_manager->onShutdown(conn);}private:TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}

五:🔥 客户端 - Requestor 实现

requestor.hpp

  • 提供发送请求的接⼝
  • 内部进⾏请求 & 响应的管理
#pragma once#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>namespace rpc
{namespace client{class Requestor{public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>; using AsyncResponse = std::future<BaseMessage::ptr>;struct RequestDescribe{using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;rpc::RType rtype;std::promise<BaseMessage::ptr> response;RequestCallback callback;};// 注册给dispatcher的void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){std::string rid = msg->rid();RequestDescribe::ptr rdp = getDescribe(rid);if(rdp.get() == nullptr) {LOG(LogLevel::ERROR) << "收到响应:" << msg->rid() << "但未找到对应的请求描述";return ;}if(rdp->rtype == RType::REQ_ASYNC) {rdp->response.set_value(msg);}else if(rdp->rtype == RType::REQ_CALLBACK) {if(rdp->callback) rdp->callback(msg); }else {LOG(LogLevel::ERROR) << "响应类型未知";} // 这里del是没有问题的 此时result已经传出去了 再删除的 所以不会出问题delDescribe(rid);}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp){RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if(rdp.get() == nullptr) {LOG(LogLevel::FATAL) << "构造请求描述对象失败! ";return false;}conn->send(req);// ononResponse 就会将响应设置到 async_rsp里async_rsp = rdp->response.get_future();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp){AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if(ret == false) {return false;}rsp = rsp_future.get();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb){RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if(rdp.get() == nullptr) {LOG(LogLevel::FATAL) << "构造请求描述对象失败! ";return false;}conn->send(req);return true;}private:RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;// 这里不需要管异步操作的结果 onresponse处理了if(rtype == RType::REQ_CALLBACK && cb) {rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}RequestDescribe::ptr getDescribe(const std::string &rid){std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if(it == _request_desc.end()) {return RequestDescribe::ptr();}return it->second;}void delDescribe(const std::string &rid){std::unique_lock<std::mutex> lock(_mutex);if(_request_desc.find(rid) != _request_desc.end()) {_request_desc.erase(rid);}}private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;};};
}

六:🔥 客户端 - RpcCaller实现

  • 提供 Rpc 请求接⼝
#pragma once#include "requestor.hpp"namespace rpc
{namespace client{class RpcCaller{public:RpcCaller(const Requestor::ptr &requestor) : _requestor(requestor) {}using ptr = std::shared_ptr<RpcCaller>;using JsonAsyncResponse = std::future<Json::Value>;using JsonResponseCallback = std::function<void(const Json::Value&)>;// requestor中的处理是针对BaseMessage进行处理的 由于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的// 同步bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, Json::Value &result){// 1. 组织请求auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);// 2. 发送请求BaseMessage::ptr rsp_msg;bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);    // 阻塞if(ret == false) {LOG(LogLevel::ERROR) << "同步Rpc请求失败! ";return false;}// 3. 等待响应RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if(!rpc_rsp_msg) {LOG(LogLevel::ERROR) << "Rpc响应, 向下类型转换失败! ";return false;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK) {LOG(LogLevel::ERROR) << "Rpc请求出错: " << errReason(rpc_rsp_msg->rcode());return false;}result = rpc_rsp_msg->result();return true;}// 异步bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, JsonAsyncResponse &result){// 1. 向服务器发送异步回调请求,设置回调函数,回调函数会传入一个promise对象,在回调函数中对promise设置数据auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);// 2. 发送请求  这里必须是智能指针 不然json_promise释放掉 future会抛出异常auto json_promise = std::make_shared<std::promise<Json::Value>>();result = json_promise->get_future();Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback, this, json_promise, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);if(ret == false) {LOG(LogLevel::ERROR) << "异步Rpc请求失败! ";return false;}return true;}// 回调bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, const JsonResponseCallback &cb)        // 回调函数必须写const 因为函数地址是常量{auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC); req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1, this, cb, std::placeholders::_1);      // 回调函数里绑定了回调函数bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if(ret == false) {LOG(LogLevel::ERROR) << "回调Rpc请求失败! ";return false;}return true;}private:void Callback1(const JsonResponseCallback &cb, const BaseMessage::ptr &msg){RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg) {LOG(LogLevel::ERROR) << "Rpc响应, 向下类型转换失败! ";return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK) {LOG(LogLevel::ERROR) << "Rpc回调请求出错: " << errReason(rpc_rsp_msg->rcode());return ;}cb(rpc_rsp_msg->result());}void Callback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr &msg){RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg) {LOG(LogLevel::ERROR) << "Rpc响应, 向下类型转换失败! ";return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK) {LOG(LogLevel::ERROR) << "Rpc异步请求出错: " << errReason(rpc_rsp_msg->rcode());return ;}result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};}
}

七:🔥 客户端 - Publish & Subscribe实现

rpc_topic.hpp

  • 提供消息发布接⼝
  • 提供主题操作接⼝
  • 内部进⾏主题及订阅者的管理
#pragma once#include "requestor.hpp"
#include <unordered_set>namespace rpc
{namespace client{class TopicManager{public:using SubCallback = std::function<void(const std::string &key, const std::string &msg)>;using ptr = std::shared_ptr<TopicManager>;TopicManager(const Requestor::ptr &requestor) : _requestor(requestor){}bool create(const BaseConnection::ptr &conn, const std::string &key){return commonRequestor(conn, key, TopicOptype::TOPIC_CREATE);}bool remove(const BaseConnection::ptr &conn, const std::string &key){return commonRequestor(conn, key, TopicOptype::TOPIC_REMOVE);}// 订阅后收到消息的回调处理函数bool subscribe(const BaseConnection::ptr &conn, const std::string &key, const SubCallback &cb){// 先建立映射关系 因为可能一订阅就有消息来了 此时回调函数还没注册addSubscribe(key, cb);bool ret = commonRequestor(conn, key, TopicOptype::TOPIC_SUBSCRIBE);if(ret == false) {delSubscribe(key);return false;}return true;}bool cancel(const BaseConnection::ptr &conn, const std::string &key){delSubscribe(key);return commonRequestor(conn, key, TopicOptype::TOPIC_CANCEL);}bool publish(const BaseConnection::ptr &conn, const std::string &key, const std::string &msg){return commonRequestor(conn, key, TopicOptype::TOPIC_PUBLISH, msg);}// 服务端推送给过来的消息请求void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr& msg){// 1. 从消息中取出操作类型进行判断,是否是消息请求auto type = msg->optype();if(type != TopicOptype::TOPIC_PUBLISH) {LOG(LogLevel::ERROR) << "收到了错误类型的主题操作! ";return ;}// 2. 取出消息主题名称,以及消息内容std::string topic_key = msg->topicKey();std::string topic_msg = msg->topicMsg();// 3. 通过主题名称,查找对应主题的回调处理函数 有则处理 无则报错auto callback = getSubscribe(topic_key);if(!callback) {LOG(LogLevel::ERROR) << "收到了 " << topic_key << " 主题消息,但是该消息无主题处理回调! ";return ;}return callback(topic_key, topic_msg);}private:void addSubscribe(const std::string &key, const SubCallback &cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key, cb));}void delSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback getSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);auto it = _topic_callbacks.find(key);if(it == _topic_callbacks.end()) return SubCallback();return it->second;}bool commonRequestor(const BaseConnection::ptr &conn, const std::string &key, TopicOptype type, const std::string &msg = ""){// 1. 构造请求对象,并填充数据auto msg_req = MessageFactory::create<TopicRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_TOPIC);msg_req->setOptype(type);msg_req->setTopicKey(key);if(type == TopicOptype::TOPIC_PUBLISH) {msg_req->setTopicMsg(msg);}// 2. 向服务端发送请求,等待响应BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if(ret == false) {LOG(LogLevel::ERROR) << "主题操作请求失败! ";return false;}// 3. 判断请求处理是否成功TopicResponse::ptr topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rsp);if(!topic_rsp_msg) {LOG(LogLevel::ERROR) << "主题操作响应, 向下类型转换失败! ";return false;}if(topic_rsp_msg->rcode() != RCode::RCODE_OK) {LOG(LogLevel::ERROR) << "主题操作请求出错: " << errReason(topic_rsp_msg->rcode());return false;}return true;}private:std::mutex _mutex;std::unordered_map<std::string, SubCallback> _topic_callbacks;Requestor::ptr _requestor;};}
}

八:🔥 客户端 - Registry & Discovery实现

rpc_registry.hpp

  • 提供服务发现接⼝
  • 提供服务注册接⼝
  • 提供服务操作(上线/下线)通知处理回调函数
  • 内部进⾏发现的服务与主机信息管理
#pragma once#include "requestor.hpp"
#include <unordered_set>namespace rpc
{namespace client{class Provider {public: using ptr = std::shared_ptr<Provider>;Provider(const Requestor::ptr &requestor) : _requestor(requestor){}bool registryMethod(const BaseConnection::ptr &conn, const std::string &method, const Address &host) {auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setHost(host);msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if(ret == false) {LOG(LogLevel::ERROR) << method << " 服务注册失败! ";return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(service_rsp.get() == nullptr) {LOG(LogLevel::ERROR) << "响应类型向下转换失败! ";return false;}if(service_rsp->rcode() != RCode::RCODE_OK) {LOG(LogLevel::ERROR) << "服务注册失败, 原因: " << errReason(service_rsp->rcode());return false;}return true;}private:Requestor::ptr _requestor;};class MethodHost{public:using ptr = std::shared_ptr<MethodHost>;MethodHost() : _idx(0) {}MethodHost(const std::vector<Address> &hosts) : _hosts(hosts), _idx(0){}void appendHost(const Address &host){   // 中途收到了服务上线请求后调用std::unique_lock<std::mutex> lock(_mutex);_hosts.push_back(host);}void removeHost(const Address& host){// 中途收到了服务下线请求后调用std::unique_lock<std::mutex> lock(_mutex);for(std::vector<Address>::iterator it = _hosts.begin(); it != _hosts.end(); ++it) {if(*it == host) {_hosts.erase(it);return ;}}}Address chooseHost(){std::unique_lock<std::mutex> lock(_mutex);if(_idx >= _hosts.size()) _idx = 0;return _hosts[_idx++];}bool empty() { std::unique_lock<std::mutex> lock(_mutex);return _hosts.empty(); }private:std::mutex _mutex;size_t _idx;                    // 负载均衡std::vector<Address> _hosts;};class Discoverer{public:using OfflineCallback = std::function<void(const Address&)>;using ptr = std::shared_ptr<Discoverer>;Discoverer(const Requestor::ptr &requestor, const OfflineCallback &cb) : _requestor(requestor),_offline_callback(cb){}bool serviceDiscovery(const BaseConnection::ptr &conn, const std::string &method, Address &host){// 当前所保管的提供者信息存在,则直接返回地址{std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find(method);if(it != _method_hosts.end()) {if(it->second->empty() == false) {host = it->second->chooseHost();return true;}}}// 当前服务提供者为空auto msg_req = MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret = _requestor->send(conn, msg_req, msg_rsp);if(ret == false) {LOG(LogLevel::ERROR) << "服务发现失败! ";return false;}auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(service_rsp.get() == nullptr) {LOG(LogLevel::ERROR) << "响应类型向下转换失败! ";return false;}if(service_rsp->rcode() != RCode::RCODE_OK) {LOG(LogLevel::ERROR) << "服务发现失败! " << errReason(service_rsp->rcode());return false;}// 能走到这里,代表当前_method_hosts 没有对应的服务提供主机{std::unique_lock<std::mutex> lock(_mutex);MethodHost::ptr method_host = std::make_shared<MethodHost>(service_rsp->hosts());if(method_host->empty()) {LOG(LogLevel::ERROR) << method << "服务发现失败! 没有能提供服务的主机" << errReason(service_rsp->rcode());return false;}host = method_host->chooseHost();_method_hosts[method] = method_host;}return true;}// 提供给dispatcher 进行服务上线下线请求处理的回调函数  服务器告诉客户端void onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) {// 1. 判断是上线还是下线请求,如果都不是就不用处理了auto optype = msg->optype();std::string method = msg->method();if(optype == ServiceOptype::SERVICE_ONLINE) {// 2. 上线请求: 找到 _method_hosts, 向其中新增一个主机地址std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find(msg->method());if(it == _method_hosts.end()) {MethodHost::ptr method_host = std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method] = method_host;} else {it->second->appendHost(msg->host());}} else if(optype == ServiceOptype::SERVICE_OFFLINE) {// 3. 下线请求: 找到 _method_hosts, 从其中删除一个主机地址std::unique_lock<std::mutex> lock(_mutex);auto it = _method_hosts.find(msg->method());if(it == _method_hosts.end()) {return ;} else {it->second->removeHost(msg->host());_offline_callback(msg->host());}}}private:OfflineCallback _offline_callback;std::mutex _mutex;std::unordered_map<std::string, MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;};}
}

九:🔥 客户端 - 整合封装 Client

rpc_client.hpp

#include "../common/dispatcher.hpp"
#include "rpc_caller.hpp" 
#include "requestor.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"namespace rpc
{namespace client{class RegistryClient{public:using ptr = std::shared_ptr<RegistryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心RegistryClient(const std::string &ip, int port) : _requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<rpc::Dispatcher>()){// 获取响应的回调函数 获取响应auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(rpc::MType::RSP_SERVICE, rsp_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client = rpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}// 向外提供的服务注册接口 发送请求bool registryMethod(const std::string &method, const Address &host) {return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};class DiscoveryClient{public:using ptr = std::shared_ptr<DiscoveryClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心DiscoveryClient(const std::string &ip, int port, const Discoverer::OfflineCallback &cb) : _requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),_dispatcher(std::make_shared<rpc::Dispatcher>()){// 获取响应的回调函数 获取响应auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<rpc::ServiceRequest>(MType::REQ_SERVICE, req_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_client = rpc::ClientFactory::create(ip, port);_client->setMessageCallback(message_cb);_client->connect();}// 向外提供的服务发现接口bool serviceDiscovery(const std::string &method, Address &host) {return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};class RpcClient{public:using ptr = std::shared_ptr<RpcClient>;// enableDiscovery 表示是否启用服务发现功能, 也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port) : _enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<RpcCaller>(_requestor)){// 针对rpc请求后的响应进行的回调处理auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);// 如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化 discovery_client// 如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好rpc_clientif(_enableDiscovery) {auto offline_cb = std::bind(&RpcClient::delClient, this,  std::placeholders::_1);_discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);} else {_rpc_client = ClientFactory::create(ip, port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}// 同步bool call(const std::string &method, const Json::Value &params, Json::Value &result){// 获取服务提供者 1. 服务发现   2. 固定的服务发现者BaseClient::ptr client = getClient(method);if(client.get() == nullptr) return false;// 3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}// 异步bool call( const std::string &method, const Json::Value &params, RpcCaller::JsonAsyncResponse &result){BaseClient::ptr client = getClient(method);if(client.get() == nullptr) return false;// 3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, result);}// 回调bool call(const std::string &method, const Json::Value &params, const RpcCaller::JsonResponseCallback &cb){BaseClient::ptr client = getClient(method);if(client.get() == nullptr) return false;// 3. 通过客户端连接,发送rpc请求return _caller->call(client->connection(), method, params, cb);}private:BaseClient::ptr newClient(const Address &host){auto client = ClientFactory::create(host.first, host.second);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);client->setMessageCallback(message_cb);client->connect();putClient(host, client);return client;}BaseClient::ptr getClient(const Address &host){std::unique_lock<std::mutex> lock(_mutex);auto it = _rpc_clients.find(host);if(it == _rpc_clients.end()) return BaseClient::ptr();return it->second;}BaseClient::ptr getClient(const std::string &method){BaseClient::ptr client;if(_enableDiscovery) {// 1. 通过服务发现获取服务提供者地址信息Address host;bool ret = _discovery_client->serviceDiscovery(method, host);if(ret == false) {LOG(LogLevel::INFO) << "当前 " << method << " 服务, 没有找到服务提供者! ";return BaseClient::ptr();}// 2. 查看服务提供者是否已有实例化客户端,有则直接使用,没有则创建client = getClient(host);if(client.get() == nullptr) {   // 没有找到已经实例化的客户端,则创建client = newClient(host);}} else {client = _rpc_client;}return client;}void putClient(const Address &host, BaseClient::ptr &client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host, client));}void delClient(const Address &host){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}struct AddressHash{size_t operator()(const Address& host) const{std::string addr = host.first + std::to_string(host.second);return std::hash<std::string>{}(addr);      // 创建一个临时对象 调用仿函数}};private:bool _enableDiscovery;Requestor::ptr _requestor;DiscoveryClient::ptr _discovery_client;RpcCaller::ptr _caller;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;        // 用于未启用服务发现std::mutex _mutex;// <"127.0.0.1", client1>std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients;     // 用于服务发现的客户端连接池};class TopicClient{public:using ptr = std::shared_ptr<TopicClient>;// 构造函数传入注册中心的地址信息,用于连接注册中心TopicClient(const std::string &ip, int port) : _requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<rpc::Dispatcher>()),_topic_manager(std::make_shared<TopicManager>(_requestor)){// 获取响应的回调函数 获取响应auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(rpc::MType::RSP_TOPIC, rsp_cb);auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(rpc::MType::REQ_TOPIC, msg_cb);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_rpc_client = rpc::ClientFactory::create(ip, port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}bool create(const std::string &key){return _topic_manager->create(_rpc_client->connection(), key);}bool remove(const std::string &key){return _topic_manager->remove(_rpc_client->connection(), key);}bool subscribe(const std::string &key, const TopicManager::SubCallback &cb){return _topic_manager->subscribe(_rpc_client->connection(), key, cb);}bool cancel(const std::string &key){return _topic_manager->cancel(_rpc_client->connection(), key);}bool publish(const std::string &key, const std::string &msg){return _topic_manager->publish(_rpc_client->connection(), key, msg);}void shutdown(){_rpc_client->shutdown();}private:Requestor::ptr _requestor;Dispatcher::ptr _dispatcher;TopicManager::ptr _topic_manager;BaseClient::ptr _rpc_client;};}
}

十:🔥 整合封装的使⽤代码样例

🦋 简单 Rpc 使⽤

test_client.cpp

#include "../../common/detail.hpp"
#include "../../client/rpc_client.hpp"void callback(const Json::Value &result)
{rpc::LOG(rpc::LogLevel::INFO) << "callback result: " << result.asInt();
}int main()
{rpc::client::RpcClient client(false, "127.0.0.1", 9090);Json::Value params, result;params["num1"] = 11;params["num2"] = 22;bool ret = client.call("Add", params, result);    if(ret != false) {std::cout << "result: " << result.asInt() << std::endl;}// 异步rpc::client::RpcCaller::JsonAsyncResponse res_future;params["num1"] = 33;params["num2"] = 44;ret = client.call("Add", params, res_future);    if(ret != false) {result = res_future.get();std::cout << "result: " << result.asInt() << std::endl;}// 异步params["num1"] = 55;params["num2"] = 66;ret = client.call("Add", params, callback);std::cout << "----------------------------------------------\n" << std::endl;sleep(1000);return 0;
}

test_server.cpp

#include "../../common/detail.hpp"
#include "../../server/rpc_server.hpp"void Add(const Json::Value &req, Json::Value &rsp)
{int num1 = req["num1"].asInt();int num2 = req["num2"].asInt();rsp = num1 + num2;
}int main()
{auto router = std::make_shared<rpc::server::RpcRouter>();std::unique_ptr<rpc::server::SDescribeFactory> desc_factory(new rpc::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("num1", rpc::server::VType::INTEGRAL);desc_factory->setParamsDesc("num2", rpc::server::VType::INTEGRAL);desc_factory->setReturnType(rpc::server::VType::INTEGRAL);desc_factory->setCallback(Add);rpc::server::Rpcserver server(rpc::Address("127.0.0.1", 9090));server.registerMethod(desc_factory->build());server.start();return 0;
}

Makefile

CFLAG= -g -std=c++17 -I ../../../build/release-install-cpp11/include/
LFLAG= -L ../../../build/release-install-cpp11/lib -ljsoncpp -lmuduo_net -lmuduo_base -lpthread
.PHONY:all
all: server client
server:test_server.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
client:test_client.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:rm -f server client

🦋 基于服务注册发现的 Rpc 调⽤

rpc_client.cpp

#include "../../common/detail.hpp"
#include "../../client/rpc_client.hpp"void callback(const Json::Value &result)
{rpc::LOG(rpc::LogLevel::INFO) << "callback result: " << result.asInt();
}// 第二次测试的
int main()
{rpc::client::RpcClient client(true, "127.0.0.1", 8080);Json::Value params, result;params["num1"] = 11;params["num2"] = 22;bool ret = client.call("Add", params, result);    if(ret != false) {std::cout << "result: " << result.asInt() << std::endl;}// 异步rpc::client::RpcCaller::JsonAsyncResponse res_future;params["num1"] = 33;params["num2"] = 44;ret = client.call("Add", params, res_future);    if(ret != false) {result = res_future.get();std::cout << "result: " << result.asInt() << std::endl;}// 异步params["num1"] = 55;params["num2"] = 66;ret = client.call("Add", params, callback);std::cout << "----------------------------------------------\n" << std::endl;sleep(1000);return 0;
}

rpc_server.cpp

#include "../../common/detail.hpp"
#include "../../server/rpc_server.hpp"void Add(const Json::Value &req, Json::Value &rsp)
{int num1 = req["num1"].asInt();int num2 = req["num2"].asInt();rsp = num1 + num2;
}int main()
{auto router = std::make_shared<rpc::server::RpcRouter>();std::unique_ptr<rpc::server::SDescribeFactory> desc_factory(new rpc::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("num1", rpc::server::VType::INTEGRAL);desc_factory->setParamsDesc("num2", rpc::server::VType::INTEGRAL);desc_factory->setReturnType(rpc::server::VType::INTEGRAL);desc_factory->setCallback(Add);rpc::server::Rpcserver server(rpc::Address("127.0.0.1", 9090), true, rpc::Address("127.0.0.1", 8080));server.registerMethod(desc_factory->build());server.start();return 0;
}

registry_server.cpp

#include "../../common/detail.hpp"
#include "../../server/rpc_server.hpp"int main()
{rpc::server::RegistryServer reg_server(8080);reg_server.start();return 0;
}

Makefile

CFLAG= -g -std=c++17 -I ../../../build/release-install-cpp11/include/
LFLAG= -L ../../../build/release-install-cpp11/lib -ljsoncpp -lmuduo_net -lmuduo_base -lpthread
.PHONY:all
all: reg_server rpc_server rpc_client
reg_server:registry_server.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
rpc_server:rpc_server.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
rpc_client:rpc_client.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:rm -f reg_server rpc_server rpc_client

🦋 基于⼴播的发布订阅

publish_client.cpp

#include "../../client/rpc_client.hpp"int main()
{// 1. 实例化客户端对象rpc::client::TopicClient::ptr client = std::make_shared<rpc::client::TopicClient>("127.0.0.1", 7070);// 2. 创建主题bool ret = client->create("hello");if(ret == false) {rpc::LOG(rpc::LogLevel::ERROR) << "创建主题失败! ";}// 3. 向主题发布消息for(int i = 0; i < 10; i++) {client->publish("hello", "Hello World" + std::to_string(i));}client->shutdown();return 0;
}

server.cpp

#include "../../server/rpc_server.hpp"int main()
{auto server = std::make_shared<rpc::server::TopicServer>(7070);server->start();return  0;
}

subscribe_client.cpp

#include "../../client/rpc_client.hpp"void callback(const std::string &key, const std::string &msg)
{rpc::LOG(rpc::LogLevel::INFO) << key << " 主题收到推送过来的消息: " << msg;
}int main()
{// 1. 实例化客户端对象rpc::client::TopicClient::ptr client = std::make_shared<rpc::client::TopicClient>("127.0.0.1", 7070);// 2. 创建主题bool ret = client->create("hello");if(ret == false) {rpc::LOG(rpc::LogLevel::ERROR) << "创建主题失败! ";}// 3. 订阅主题ret = client->subscribe("hello", callback);// 4. 等待 -> 退出std::this_thread::sleep_for(std::chrono::seconds(10));client->shutdown();return 0;
}

Makefile

CFLAG= -g -std=c++17 -I ../../../build/release-install-cpp11/include/
LFLAG= -L ../../../build/release-install-cpp11/lib -ljsoncpp -lmuduo_net -lmuduo_base -lpthread
.PHONY:all
all: server publish_client subscribe_client
server : server.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
publish_client : publish_client.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
subscribe_client : subscribe_client.cppg++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:rm -f server publish_client subscribe_client

十一:🔥 共勉

😋 以上就是我对 【C++项目】从零实现RPC框架「四」:业务层实现与项目使用 的理解, 觉得这篇博客对你有帮助的,可以点赞收藏关注支持一波~ 😉
在这里插入图片描述

关键字:织梦论坛源码_莱芜论坛莱芜在线_sem竞价托管费用_代写文章质量高的平台

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: