连接关系着客户端和服务端,所以这里分别介绍这个项目的客户端与服务端之间的连接
请求回应设计
syntax = "proto3";
package mymq;import "msg.proto";//信道的打开与关闭
message openChannelRequest{string rid = 1;string cid = 2;
};
message closeChannelRequest{string rid = 1;string cid = 2;
};
//交换机的声明与删除
message declareExchangeRequest{string rid = 1;string cid = 2;string exchange_name = 3;ExchangeType exchange_type = 4;bool durable = 5;bool auto_delete = 6;map<string, string> args = 7;
};
message deleteExchangeRequest{string rid = 1;string cid = 2;string exchange_name = 3;
};
//队列的声明与删除
message declareQueueRequest{string rid = 1; string cid = 2;string queue_name = 3;bool exclusive = 4;bool durable = 5;bool auto_delete = 6;map<string, string> args = 7;
};
message deleteQueueRequest{string rid = 1; string cid = 2;string queue_name = 3;
};
//队列的绑定与解除绑定
message queueBindRequest{string rid = 1;string cid = 2;string exchange_name = 3;string queue_name = 4;string binding_key = 5;
};
message queueUnBindRequest{string rid = 1;string cid = 2;string exchange_name = 3;string queue_name = 4;
};
//消息的发布
message basicPublishRequest {string rid = 1;string cid = 2;string exchange_name = 3;string body = 4;BasicProperties properties = 5;
};
//消息的确认
message basicAckRequest {string rid = 1;string cid = 2;string queue_name = 3;string message_id = 4;
};
//队列的订阅
message basicConsumeRequest {string rid = 1;string cid = 2;string consumer_tag = 3;string queue_name = 4;bool auto_ack = 5;
};
//订阅的取消
message basicCancelRequest {string rid = 1;string cid = 2;string consumer_tag = 3;string queue_name = 4;
};
//消息的推送
message basicConsumeResponse {string cid = 1;string consumer_tag = 2;string body = 3;BasicProperties properties = 4;
};
//通用响应
message basicCommonResponse {string rid = 1;string cid = 2;bool ok = 3;
}
服务端
通过上面的请求与回应的设计,我们在客户端和服务端在进行消息交换的时候,直接解析和封装响应的报文即可。
信道
信道是网络通信中的一个概念,叫做通信通道
网络通信的时候,必然都是通过网络通信连接来完成的,为了能够更加充分的利用资源,因此对通信连接又进行了进一步的细化,细化出了通信通道。
对于用户来说,一个通信通道就是进行网络通信的载体,而一个真正的通信连接,可以创建出多个通信通道。
每一个信道之间,在用户的眼里是相互独立的,而本质上,他们使用同一个通信连接进行网络通信。
因此,因为信道是用户眼里的一个通信通道,所以所有网络通信服务都是由信道提供的。
信道提供的服务操作
- 声明 / 删除 交换机
- 声明 / 删除 队列
- 绑定 / 解绑 队列和交换机
- 发布消息/订阅队列消息/取消队列订阅/队列消息确认
信道要管理的数据:
- 信道ID
- 信道关联的虚拟机句柄:
- 工作线程句柄:信道进行了消费发布到指定队列操作后,从指定队列获取一个消费者,对这条消息进行消费。也就是将这条消息推送给一个客户端的操作交给线程池执行。并非每个信道都有一个线程池,而是整个服务器有一个线程池,大家所有的信道都是通过同一个线程池进行异步操作。
信道的管理:
- 创建一个信道
- 关闭一个信道
- 获取指定信道句柄
代码展示
#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/logger.hpp"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/mq_proto.pb.h"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/threadpool.hpp"
#include "mq_consumer.hpp"
#include "mq_host.hpp"
#include "mq_route.hpp"namespace mymq
{using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;class Channel{public:using ptr = std::shared_ptr<Channel>;Channel(const std::string& id, const VirtualHost::ptr& host,const ConsumerManager::ptr& cmp,const ProtobufCodecPtr& codec,const muduo::net::TcpConnectionPtr &conn,const threadPool::ptr& pool):_cid(id),_conn(conn),_codec(codec),_cmp(cmp),_host(host),_pool(pool){DLOG("new Cannel: %p", this);}~Channel(){if(_consumer.get() != nullptr){_cmp->remove(_consumer->tag, _consumer->qname);}DLOG("del Channel : %p", this);}// 交换机的声明和删除void declareExchange(const declareExchangeRequestPtr& req){bool ret = _host->declareExchange(req->exchange_name(),req->exchange_type(),req->durable(),req->auto_delete(),req->args());return basicResponse(ret, req->rid(), req->cid());}void deleteExchange(const deleteExchangeRequestPtr& req){_host->deleteExchange(req->exchange_name());return basicResponse(true, req->rid(), req->cid());}// 队列的声明和删除void declareQueue(const declareQueueRequestPtr& req){bool ret = _host->declareQueue(req->queue_name(),req->durable(),req->exclusive(),req->auto_delete(),req->args());if (ret == false) {return basicResponse(false, req->rid(), req->cid());}_cmp->initQueueConsumer(req->queue_name());//初始化队列的消费者管理句柄return basicResponse(true, req->rid(), req->cid());}void deleteQueue(const deleteQueueRequestPtr& req){_cmp->destroyQueueConsumer(req->queue_name());_host->deleteQueue(req->queue_name());return basicResponse(true, req->rid(), req->cid());}// 队列的绑定和解除绑定void queueBind(const queueBindRequestPtr& req){bool ret = _host->bind(req->exchange_name(), req->queue_name(), req->binding_key());return basicResponse(ret, req->rid(), req->cid());}void queueUnBind(const queueUnBindRequestPtr& req){_host->unBind(req->exchange_name(), req->queue_name());return basicResponse(true, req->rid(), req->cid());}// 消息的发布void basicPulish(const basicPublishRequestPtr& req){// 1. 判断交换机是否存在auto ep = _host->selectExchange(req->exchange_name());if(ep.get() == nullptr){return basicResponse(false, req->rid(), req->cid());}// 2. 进行交换路由,判断消息可以发布到交换机绑定的哪个队列中MsgQueueBindingMap mqpm = _host->exchangeBindings(req->exchange_name());BasicProperties* properties = nullptr;std::string routing_key;if(req->has_properties()){properties = req->mutable_properties();routing_key = properties->rounting_key();}for(auto& binding : mqpm){if(Router::route(ep->type, routing_key, binding.second->binding_key)){printf("%s:%d 匹配成功 %s - %s\n",__FILE__,__LINE__, routing_key.c_str(), binding.second->binding_key.c_str());_host->basicPublish(binding.first, properties, req->body());// 函数绑定// 这里会调用auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}printf("%s:%d 发布消息成功\n",__FILE__,__LINE__);return basicResponse(true, req->rid(), req->cid());}// 消息的确认void basicAck(const basicAckRequestPtr& req){_host->basicAck(req->queue_name(), req->message_id());return basicResponse(true, req->rid(), req->cid());}// 订阅队列消息void basicConsume(const basicConsumeRequestPtr& req){// 1. 判断队列是否存在bool ret = _host->existsQueue(req->queue_name());if(ret == false){return basicResponse(false, req->rid(), req->cid());}// 2. 创建队列的消费者auto cb = std::bind(&Channel::callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);// 创建了消费者之后,当前的channel角色就是个消费者_consumer = _cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), cb);printf("%s:%d 创建消费者成功: %s\n",__FILE__,__LINE__, _consumer->qname.c_str());return basicResponse(true, req->rid(), req->cid());}// 取消订阅void basicCancel(const basicCancelRequestPtr& req){_cmp->remove(req->consumer_tag(), req->queue_name());return basicResponse(true, req->rid(), req->cid());}private:void callback(const std::string tag, const BasicProperties* bp, const std::string & body){// 针对参数组织出推送消息请求,将消息推送给channel对应的客户端basicConsumeResponse resp;resp.set_cid(_cid);resp.set_consumer_tag(tag);resp.set_body(body);if(bp){resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_rounting_key(bp->rounting_key());}_codec->send(_conn, resp);}void consume(const std::string& qname){// 指定队列消费消息// 1. 从队列中取出一个消息printf(" %s:%d %s开始消费\n", __FILE__,__LINE__, qname.c_str());// 获取队首消息MessagePtr mp = _host->basicConsume(qname);// 不存在队首消息,说明消费者对应的消息队列不存在if(mp.get() == nullptr){DLOG("执行消费任务失败,%s 队列不存在", qname.c_str());return ;}// 2. 从队列订阅者中取出一个订阅者// RR轮转Consumer::ptr cmp= _cmp->choose(qname);if(cmp.get() == nullptr){DLOG("执行消费任务失败,%s 队列没有消费者", qname.c_str());return ;}// 3. 调用订阅者对应的消费者处理函数,实现消费的推送// 4. 这里是调用消费者对应的回调函数,回调函数的定义在客户端的订阅客户端中cmp->callback(cmp->tag, mp->mutable_payload()->mutable_properties(), mp->mutable_payload()->body());if(cmp->auto_ack)_host->basicAck(qname, mp->payload().properties().id());}void basicResponse(bool ok, const std::string& rid, const std::string& cid){basicCommonResponse resp;resp.set_ok(ok);resp.set_rid(rid);resp.set_cid(cid);_codec->send(_conn, resp);}private:std::string _cid;Consumer::ptr _consumer;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _host;threadPool::ptr _pool;};class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager(){}bool openChannel(const std::string& id, const VirtualHost::ptr _host,const ConsumerManager::ptr& cmp, const ProtobufCodecPtr& codec,const muduo::net::TcpConnectionPtr& conn, const threadPool::ptr& pool){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if(it != _channels.end()){DLOG("信道:%s 已存在",id.c_str());return false;}auto channel = std::make_shared<Channel>(id, _host, cmp, codec, conn, pool);_channels.insert(std::make_pair(id, channel));return true;}void closeChannel(const std::string& id){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(id);}Channel::ptr getChannel(const std::string& id){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(id);if(it == _channels.end()){return Channel::ptr(); }return it->second;}private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};
}#endif
连接
在网络通信模块中,我们使用muduo库来实现底层通信,muduo库中本身就有Connection连接的概念和对象类,但是我们的连接中,还有一个上层通信信道的概念,所以需要对这个Connection进行二次封装,形成我们自己的连接类,来适配上面的信道
管理的数据
- muduo库的通信连接
- 当前连接关联的信道管理句柄
连接提供的操作
- 创建信道
- 关闭信道
- 获得信道
- 报文回复
管理的操作
- 新增连接
- 关闭连接
- 获取指定连接信息
代码展示
#include "mq_channel.hpp"namespace mymq
{class Connection{public:using ptr = std::shared_ptr<Connection>;Connection(const VirtualHost::ptr& host, const ConsumerManager::ptr& cmp, const ProtobufCodecPtr& codec, const muduo::net::TcpConnectionPtr& conn, const threadPool::ptr& pool):_host(host),_cmp(cmp),_codec(codec),_conn(conn),_pool(pool),_channels(std::make_shared<ChannelManager>()){}void openChannel(const openChannelRequestPtr& req){// 1. 判断信道ID是否重复,创建信道bool ret = _channels->openChannel(req->cid(), _host, _cmp, _codec, _conn, _pool);if(ret == false){DLOG("创建信道失败,信道 %s 已存在", req->cid().c_str());return basicResponse(false, req->rid(), req->cid());}DLOG("创建信道 %s 成功", req->cid().c_str());return basicResponse(true, req->rid(), req->cid());}void closeChannel(const closeChannelRequestPtr& req){_channels->closeChannel(req->cid());return basicResponse(true, req->rid(), req->cid());}Channel::ptr getChannel(const std::string& id){return _channels->getChannel(id);}private:void basicResponse(bool ok, const std::string& rid, const std::string& cid){basicCommonResponse br;br.set_ok(ok);br.set_rid(rid);br.set_cid(cid);_codec->send(_conn, br);}private:muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;ConsumerManager::ptr _cmp;VirtualHost::ptr _host;threadPool::ptr _pool;ChannelManager::ptr _channels;};class ConnectionManager{public:using ptr = std::shared_ptr<ConnectionManager>;ConnectionManager(){}void newConnection(const VirtualHost::ptr& host, const ConsumerManager::ptr& cmp, const ProtobufCodecPtr& codec, const muduo::net::TcpConnectionPtr& conn, const threadPool::ptr& pool){std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if(it != _conns.end()){DLOG("连接已存在");return ;}Connection::ptr self_conn = std::make_shared<Connection>(host, cmp, codec, conn, pool);_conns.insert(make_pair(conn, self_conn));}void delConnection(const muduo::net::TcpConnectionPtr& conn){std::unique_lock<std::mutex> lock(_mutex);_conns.erase(conn);}Connection::ptr getConnection(const muduo::net::TcpConnectionPtr& conn){std::unique_lock<std::mutex> lock(_mutex);auto it = _conns.find(conn);if(it == _conns.end()){DLOG("未找到连接");return Connection::ptr();}return it->second;}private:std::mutex _mutex;std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns;};
}
经理
一切都写好了,自然要给Connection外部加一层封装来实现对各个请求的响应。参考muduo库实现网络通信。
代码展示
#ifndef __M_BROKER_H__
#define __M_BROKER_H__#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include "../mqcommon/threadpool.hpp"
#include "../mqcommon/logger.hpp"
#include "../mqcommon/mq_proto.pb.h"
#include "../mqcommon/msg.pb.h"
#include "mq_connection.hpp"
#include "mq_channel.hpp"
#include "mq_host.hpp"namespace mymq
{
#define DBFILE "/meta.db"
#define HOSTNAME " MyVirtualHost"class Server{public:typedef std::shared_ptr<google::protobuf::Message> MessagePtr;Server(int port, const std::string &basedir): _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),"Server", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),_consumer_manager(std::make_shared<ConsumerManager>()),_connection_manager(std::make_shared<ConnectionManager>()),_pool(std::make_shared<threadPool>()){// 针对历史消息中的所有队列,别忘了,初始化队列的消费者管理结构QueueMap qm = _virtual_host->allqueue();for (auto &q : qm){_consumer_manager->initQueueConsumer(q.first);}// 注册业务请求处理函数_dispatcher.registerMessageCallback<mymq::openChannelRequest>(std::bind(&Server::onOpenChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::closeChannelRequest>(std::bind(&Server::onCloseChannel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::queueBindRequest>(std::bind(&Server::onQueueBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::basicPublishRequest>(std::bind(&Server::onBasicPublish, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::basicAckRequest>(std::bind(&Server::onBasicAck, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<mymq::basicCancelRequest>(std::bind(&Server::onBasicCancel, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));}void start(){_server.start();_baseloop.loop();}private:// 打开信道void onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const openChannelRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("打开信道时,没有找到对应的Connection对象");conn->shutdown();return;}return mconn->openChannel(message);}// 关闭信道void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const closeChannelRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("打开信道时,没有找到对应的Connection对象");conn->shutdown();return;}return mconn->closeChannel(message);}// 声明交换机void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const declareExchangeRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("声明交换机时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("声明交换机的时候,没有找到相关信道");conn->shutdown();return;}return cnp->declareExchange(message);}// 删除交换机void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const deleteExchangeRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("删除交换机时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("删除交换机的时候,没有找到相关信道");conn->shutdown();return;}return cnp->deleteExchange(message);}// 声明队列void onDeclareQueue(const muduo::net::TcpConnectionPtr &conn, const declareQueueRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("声明队列时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("声明队列的时候,没有找到相关信道");conn->shutdown();return;}return cnp->declareQueue(message);}// 删除队列void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn, const deleteQueueRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("删除队列时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("删除队列的时候,没有找到相关信道");conn->shutdown();return;}return cnp->deleteQueue(message);}// 队列绑定void onQueueBind(const muduo::net::TcpConnectionPtr &conn, const queueBindRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("队列绑定时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("队列绑定的时候,没有找到相关信道");conn->shutdown();return;}return cnp->queueBind(message);}// 队列解绑void onQueueUnBind(const muduo::net::TcpConnectionPtr &conn, const queueUnBindRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("队列解绑时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("队列解绑的时候,没有找到相关信道");conn->shutdown();return;}return cnp->queueUnBind(message);}// 消息发布void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const basicPublishRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("消息发布时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("消息发布的时候,没有找到相关信道");conn->shutdown();return;}return cnp->basicPulish(message);}// 消息确认void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const basicAckRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("消息确认时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("消息确认的时候,没有找到相关信道");conn->shutdown();return;}return cnp->basicAck(message);}// 队列消息订阅void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const basicConsumeRequestPtr &message, muduo::Timestamp){printf("%s:%d onBasicConsume begin\n",__FILE__,__LINE__);Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("队列消息订阅时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("队列消息订阅的时候,没有找到相关信道");conn->shutdown();return;}printf("%s:%d onBasicConsume begin\n",__FILE__,__LINE__);return cnp->basicConsume(message);}// 队列消息取消订阅void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const basicCancelRequestPtr &message, muduo::Timestamp){Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn == nullptr){DLOG("队列消息取消订阅时,没有找到对应的Connection对象");conn->shutdown();return;}Channel::ptr cnp = mconn->getChannel(message->cid());if (cnp == nullptr){DLOG("队列消息取消订阅的时候,没有找到相关信道");conn->shutdown();return;}return cnp->basicCancel(message);}// 未知消息void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}// 连接void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_connection_manager->newConnection(_virtual_host, _consumer_manager, _codec, conn, _pool);}else{_connection_manager->delConnection(conn);}}private:muduo::net::EventLoop _baseloop;muduo::net::TcpServer _server; // 服务器对象ProtobufDispatcher _dispatcher; // 请求分发器对象 -- 要向其中注册请求处理函数ProtobufCodecPtr _codec; // protobuf 协议处理器 -- 针对收到的请求数据进行protobuf协议处理VirtualHost::ptr _virtual_host;ConsumerManager::ptr _consumer_manager;ConnectionManager::ptr _connection_manager;threadPool::ptr _pool;};
}#endif
客户端
同理,和服务端一样,客户端我们也是将具体的报文发送功能交给Channel, Connection只进行信道的打开和关闭,获取等相关操作.
信道
#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/logger.hpp"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include "mq_consumer.hpp"
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unordered_map>namespace mymq
{typedef std::shared_ptr<google::protobuf::Message> MessagePtr;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;class Channel{public:using ptr = std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec): _cid(UUIDHelper::uuid()), _conn(conn), _codec(codec){}~Channel(){basicCancel();}std::string cid(){return _cid;}bool openChannel(){std::string rid = UUIDHelper::uuid();openChannelRequest req;req.set_rid(rid);req.set_cid(_cid);_codec->send(_conn, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void closeChannel(){std::string rid = UUIDHelper::uuid();closeChannelRequest req;req.set_rid(rid);req.set_cid(_cid);_codec->send(_conn, req);waitResponse(rid);return;}bool declareExchange(const std::string &ename, ExchangeType type,bool durable, bool auto_delete, google::protobuf::Map<std::string, std::string> &args){// 构建一个声明虚拟机的请求对象std::string rid = UUIDHelper::uuid();declareExchangeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_exchange_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);_codec->send(_conn, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void deleteExchange(const std::string ename){// 构建一个声明虚拟机的请求对象std::string rid = UUIDHelper::uuid();deleteExchangeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);_codec->send(_conn, req);waitResponse(rid);return;}bool declareQueue(const std::string &qname, bool exclusive, bool durable,bool auto_delete, google::protobuf::Map<std::string, std::string> &args){std::string rid = UUIDHelper::uuid();declareQueueRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);req.set_durable(durable);req.set_auto_delete(auto_delete);req.set_exclusive(exclusive);req.mutable_args()->swap(args);_codec->send(_conn, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void deleteQueue(const std::string &qname){std::string rid = UUIDHelper::uuid();deleteQueueRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);_codec->send(_conn, req);waitResponse(rid);return;}bool queueBind(const std::string &ename,const std::string &qname,const std::string &key){std::string rid = UUIDHelper::uuid();queueBindRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);_codec->send(_conn, req);basicCommonResponsePtr resp = waitResponse(rid);return resp->ok();}void queueUnBind(const std::string &ename, const std::string &qname){std::string rid = UUIDHelper::uuid();queueUnBindRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);_codec->send(_conn, req);waitResponse(rid);return;}void basicPublish(const std::string &ename,const BasicProperties *bp,const std::string &body){std::string rid = UUIDHelper::uuid();basicPublishRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_body(body);req.set_exchange_name(ename);if (bp != nullptr){req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_rounting_key(bp->rounting_key());}_codec->send(_conn, req);waitResponse(rid);return;}void basicAck(const std::string &msgid){if (_consumer.get() == nullptr){DLOG("消息确认时,找不到消费者信息!");return;}std::string rid = UUIDHelper::uuid();basicAckRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer->qname);req.set_message_id(msgid);_codec->send(_conn, req);waitResponse(rid);return;}void basicCancel(){if(_consumer.get() == nullptr){return ;}std::string rid = UUIDHelper::uuid();basicCancelRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consumer_tag(_consumer->tag);req.set_queue_name(_consumer->qname);_codec->send(_conn, req);waitResponse(rid);_consumer.reset();return ;}bool basicConsume(const std::string& consumer_tag, const std::string queue_name, bool auto_ack, const ConsumerCallback& cb){if(_consumer.get() != nullptr){DLOG("当前信道已订阅了其他消费者!");return false;}std::string rid = UUIDHelper::uuid();basicConsumeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(queue_name);req.set_consumer_tag(consumer_tag);req.set_auto_ack(auto_ack);_codec->send(_conn, req);basicCommonResponsePtr resp = waitResponse(rid);if (resp->ok() == false) {DLOG("添加订阅失败!");return false;}_consumer = std::make_shared<Consumer>(consumer_tag, queue_name, auto_ack, cb);return true;}// 连接收到基础响应后,向hash_map中添加响应void putBasicResponse(const basicCommonResponsePtr &resp){std::unique_lock<std::mutex> lock(_mutex);_basic_resp.insert(std::make_pair(resp->rid(), resp));_cv.notify_all();}// 连接收到消息推送后,还需要通过信道找到对应的消费者对象,通过回调函数进行消息处理void consume(const basicConsumeResponsePtr &resp){if(_consumer.get() == nullptr){DLOG("消息处理时,未处理订阅者信息");return ;}if(_consumer->tag != resp->consumer_tag()){DLOG("收到的推送消息中的消费者标识,与当前信道消费者标识不一致");return ;}_consumer->callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());}private:basicCommonResponsePtr waitResponse(const std::string &rid){std::unique_lock<std::mutex> lock(_mutex);_cv.wait(lock, [&rid, this](){return _basic_resp.find(rid) != _basic_resp.end();});basicCommonResponsePtr basic_ptr = _basic_resp[rid];_basic_resp.erase(rid);return basic_ptr;}private:muduo::net::TcpConnectionPtr _conn;std::string _cid;Consumer::ptr _consumer;ProtobufCodecPtr _codec;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp;};class ChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager(){}Channel::ptr create(const muduo::net::TcpConnectionPtr& conn, ProtobufCodecPtr& codec){std::unique_lock<std::mutex> lock(_mutex);auto channel = std::make_shared<Channel>(conn, codec);_channels.insert(std::make_pair(channel->cid(), channel));return channel;}void remove(const std::string& name){std::unique_lock<std::mutex> lock(_mutex);_channels.erase(name);}Channel::ptr get(const std::string& name){std::unique_lock<std::mutex> lock(_mutex);auto it = _channels.find(name);if(it == _channels.end()){DLOG("未找到对应的信道");return Channel::ptr();}return it->second;}private:std::mutex _mutex;std::unordered_map<std::string, Channel::ptr> _channels;};}#endif
连接
#ifndef __M_CONNECTION_H__
#define __M_CONNECTION_H__#include "muduo/proto/dispatcher.h"
#include "muduo/proto/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"#include "mq_channel.hpp"
#include "mq_worker.hpp"namespace mymq
{class Connection{public:using ptr = std::shared_ptr<Connection>;Connection(const std::string &sip, int sport, const AsyncWorker::ptr &worker): _latch(1),_client(worker->loopthread.startLoop(),muduo::net::InetAddress(sip, sport), "Client"),_dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<basicCommonResponse>(std::bind(&Connection::basicResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<basicConsumeResponse>(std::bind(&Connection::ConsumeResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));_client.connect();_latch.wait(); // 阻塞等待,直到连接建立成功}Channel::ptr openChannel(){Channel::ptr channel = _channel_manager->create(_conn, _codec);bool ret = channel->openChannel();if (ret == false){DLOG("打开信道失败!");return Channel::ptr();}return channel;}void closeChannel(const Channel::ptr &channel){channel->closeChannel();_channel_manager->remove(channel->cid());}private:void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp){// 1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel == nullptr){DLOG("未找到信道消息");return;}// 2. 将得到的响应对象,添加到信道的基础响应hash_map中channel->putBasicResponse(message);}void ConsumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp){// 1. 找到信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel == nullptr){DLOG("未找到信道消息");return;}// 2. 封装异步任务(消息处理任务),抛入线程池_worker->pool.push([channel, message](){ channel->consume(message); });}void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_conn = conn;_latch.countDown();}else{// 连接关闭是的操作_conn.reset();}}private:muduo::CountDownLatch _latch; // 实现同步的muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接muduo::net::TcpClient _client; // 客户端ProtobufDispatcher _dispatcher; //请求分发器ProtobufCodecPtr _codec; // 协议处理器AsyncWorker::ptr _worker;ChannelManager::ptr _channel_manager;};
}#endif