提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 一、etcd封装
- 二、封装的思想
- 1.封装服务注册客户端类
- 2.封装服务发现客户端类
- 三.简单使用
- 1.服务注册客户端
- 2.服务发现客户端
一、etcd封装
1.服务注册客户端:向服务器新增服务信息数据,并进行保活
2.服务发现客户端:从服务器查找服务信息数据,并进行改变事件监控
封装的时候,我们尽量减少模块之间的耦合度,本质上etcd是一个键值存储系统,并不是专门用于作为注册中心进行服务注册和发现的。
二、封装的思想
1.封装服务注册客户端类
提供一个接口:向服务器新增数据并进行保活
参数:注册中心地址(etcd服务器地址),
新增的服务信息(服务名-主机地址键值对)
成员变量有三个,一个etcd::CLient客户端,一个keep_Alive保活对象,一个lease_id租约Id
构造函数需要提供etcd注册中心的地址,这样才能连接服务器。在构造函数初始化列表中初始化三个成员变量,这里的保活事件设置的3秒。
需要提供一个接口,用于服务注册,该接口有两个参数,一个std::string&key,一个是std::string& val。
接口内部调用put方法向服务器添加kv数据,并添加上续约Id。
class Registry
{
public:
using ptr = std::shared_ptr<Registry>;Registry(const std::string& host):_client(std::make_shared<etcd::Client>(host)),_keep_alive(_client->leasekeepalive(3).get()),_lease_id(_keep_alive->Lease()){}~Registry(){//取消保活_keep_alive->Cancel();}//服务注册接口bool registry(const std::string& key,const std::string& val){auto resp = _client->put(key,val,_lease_id).get();if(resp.is_ok() == false){LOG_ERROR("注册数据失效: {}",resp.error_message());return false;}return true;}private:std::shared_ptr<etcd::Client> _client; //客户端std::shared_ptr<etcd::KeepAlive> _keep_alive; //保活对象uint64_t _lease_id; //租约Id
};
2.封装服务发现客户端类
提供两个设置回调函数的接口:服务上线事件接口(数据新增),服务下线事件接口(数据删除)
提供一个设置根目录的接口:用于获取指定目录下的数据以及监控目录下数据的改变
该类成员变量有四个,两个用使用者设置的回调函数,一个client用户连接服务器,一个watcher用于监控数据的改变.
该类不需要提供对外接口,在构造函数中,需要传递四个参数,分别是两个回调函数,一个etcd服务器的地址用于连接服务器,还有一个是x需要监控的服务名称。
构造函数中会先更具参数中的basedir,获取当前服务器该basedir已有的数据.然后通过watcher监控basedir的改变,分别处理新增和删除的事件,事件发生时,调用用户设置的对应的回调函数。
需要注意的是,在构造时会先进行服务获取,调用了ls()这个接口,而不是get()。这个接口的意思是获取服务路径下所有服务。
打个比方,服务注册客户端注册了三个服务,分别是"/service/friend",“/service/user”,“/service/message”。我们使用ls进行服务发现需要提供一个key,我们提供“/service”,就会把这三个服务数据都获取到。
在watcher构造时,传入了四个参数,第一个参数时client对象的实例,第二个参数时监控的服务路径,第三个参数是一个回调函数,当监控的服务数据触发事件,就会调用这个回调函数,第四个参数是是否要递归监控服务路劲下所有服务。我们这是是true。
在这个回调函数中会遍历触发的事件,如果是新增事件则调用用户设置的put_cb,如果是删除事件,则调用del_cb;
/服务发现客户端类
class Discovery
{
public:
using ptr = std::shared_ptr<Discovery>;//服务发现使用者需要提供两个回调函数,当监控的数据发送改变时,内部会自动调用对应的回调函数using NotifyCallback = std::function<void(std::string,std::string)>;Discovery(const std::string& host,const std::string& basedir,const NotifyCallback& put_cb,const NotifyCallback& del_cb):_client(std::make_shared<etcd::Client>(host)),_put_cb(put_cb),_del_cb(del_cb){//1.先进行服务发现,获取到当前已有的数据auto resp = _client->ls(basedir).get();if (resp.is_ok() == false) {LOG_ERROR("获取服务信息数据失败:{}", resp.error_message());}//可能会有多个结果int sz = resp.keys().size();for (int i = 0; i < sz; ++i) {if (_put_cb) _put_cb(resp.key(i), resp.value(i).as_string());}//2.然后进行事件监控,监控数据发生的改变,并调用回调进行处理_watcher = std::make_shared<etcd::Watcher>(*_client.get(),basedir,std::bind(&Discovery::callback,this,std::placeholders::_1),true);// _watcher->Wait();}~Discovery() {//取消监控_watcher->Cancel();}private:void callback(const etcd::Response &resp) {if (resp.is_ok() == false) {LOG_ERROR("收到一个错误的事件通知: {}", resp.error_message());return;}for (auto const& ev : resp.events()) {if (ev.event_type() == etcd::Event::EventType::PUT) {if (_put_cb) _put_cb(ev.kv().key(), ev.kv().as_string());LOG_DEBUG("新增服务:{}-{}", ev.kv().key(), ev.kv().as_string());}else if (ev.event_type() == etcd::Event::EventType::DELETE_) {if (_del_cb) _del_cb(ev.prev_kv().key(), ev.prev_kv().as_string());LOG_DEBUG("下线服务:{}-{}", ev.prev_kv().key(), ev.prev_kv().as_string());}}}private:NotifyCallback _put_cb;NotifyCallback _del_cb;std::shared_ptr<etcd::Client> _client; //客户端std::shared_ptr<etcd::Watcher> _watcher; //服务监控对象
};
三.简单使用
1.服务注册客户端
注册两个服务,“service.friend"和”.service.user"。线程休眠10秒后退出。
#include "../common/etcd.hpp"
#include <thread>int main()
{init_logger(false,"",0);//创建一个服务注册客户端Registry::ptr rClient = std::make_shared<Registry>("127.0.0.1:2379");rClient->registry("/service/friend","192.168.1.1:8080");rClient->registry("/service/user","192.168.1.2:8080");std::this_thread::sleep_for(std::chrono::seconds(10));return 0;
}
2.服务发现客户端
服务发现客户端不需要调用接口,因为在创建服务发现客户端时构造函数中就已经使用watcher监控了basedir服务。
这个basedir就是这里传入的参数“/service”。
#include "../common/etcd.hpp"void putHandle(std::string service_name,std::string host_name)
{LOG_DEBUG("新增服务:{}-{}", service_name, host_name);
}void delHandle(std::string service_name,std::string host_name)
{LOG_DEBUG("下线服务:{}-{}", service_name, host_name);
}int main()
{init_logger(false,"",0);//创建一个服务发现客户端Discovery::ptr dClient = std::make_shared<Discovery>("127.0.0.1:2379","/service",putHandle,delHandle);std::this_thread::sleep_for(std::chrono::seconds(600));return 0;
}