什么是消息队列?
从阻塞队列开始讲起,阻塞队列支持在多线程环境下安全的进行操作,主要特性是当一个线程尝试对一个空的队列执行删除操作时,这个线程将会被阻塞,直到队列中有了可供处理的元素;同样,如果一个线程尝试向一个满的队列添加元素时,该线程也会被阻塞,直到队列中有可用的空间。这种机制简化了生产者-消费者问题的实现。那么所谓的消息队列,就是把阻塞队列这样的数据结构,单独提取成一个程序,独立进行部署。
生产者和消费者模型的作用:
- 解耦合:分布式系统,A服务器调用B服务器,即A给B发送请求,B给A响应,这样的结果导致A和B之间的耦合度较大,引入消息队列后,A把请求发给消息队列,B再从消息队列中获取请求。
- 削峰填谷:若A和B直接通信,A突然收到用户请求的峰值,此时B感受到峰值,B可能会挂掉。引入消息队列后,A将请求发给队列,B从队列中取请求,当A收到很多请求后,队列也受到很多请求,但是B可以按照原有的节奏来取请求。
需求分析
核心概念:
- 生产者(Producer):发送消息的实体,只关心如何将消息发出去。
- 消费者(Consumer):从消息队列中接受并处理信息的实体,根据接收到的消息执行相应的操作,并不需要了解消息是由哪个生产者发送的。
- 中间人(Broker):生产者和消费者之间充当中介的角色,主要职责是存储和转发消息
- 发布(Publish):生产者向中间人投递消息的过程
- 订阅(Subscribe):那些消费者要从这个中间人这里取数据,这个注册过程称为“订阅”
- 消费(Consume):消费者从中间人这里取数据的动作
BrokerServer涉及到的相关概念
- 虚拟主机(Virtual Host):类似于MySQL中的database,算是“逻辑”上的数据集合。逻辑上区分不同种类的数据。
- 交换机(Exchange):生产者把消息投递给Broker Server,实际上是先把Broker Server上的某个交换机,再由交换机把消息转发给对应的队列
- 队列(Queue):真正用来存储处理消息的实体,后续消费者也是从对应的队列中提取数据
- 绑定(Binding):交换机和队列联系起来
- 消息(Message):服务器A个B发的请求(通过MQ转发),服务器B给A返回的响应(通过MQ转发)也是一个消息
注:
- 一个大的消息队列可以有很多个具体小的队列
- 可以把交换机和队列视为数据库多对多的关系,一个交换机可以对应多个队列,一个队列可以被多个交换机对应
- 消息中具体包含什么数据,是由程序员自定义的
关系图如下:
核心API
- 创建队列(queueDeclare):Declare效果,不存在进行创建,存在什么都不做
- 销毁队列(queueDelete)
- 创建交换机(exchangeDeclare)
- 销毁交换机(exchangeDelete)
- 创建绑定(queueBind)
- 解除绑定(queueUnbind)
- 发表消息(basicPublish)
- 订阅消息(basicConsume)
- 确认消息(basicAck):可以让消费者显式的告诉broker server,这个消息已经处理完毕,提高整个系统的可靠性,保证消息处理没有遗漏
不进行消费消息的api(让消费者通过这个api从服务器上取走消息),原因如下:
对于MQ和消费者之间有两种工作模式
1.push(推):Broker把收到的数据主动发送给订阅的消费者,例如RabbitMQ
2.pull(拉):消费者主动调用Broker的api取数据,例如Kafka
注:对于RabbitMQ来说,除了肯定的确认,还提供否定的确认
交换机类型
交换机在转发消息的时候,有一套转发规则,RabbitMQ主要实现的4种交换机类型
- Direct直接交换机:生产者发送消息的时候,指定一个“目标队列的名字”,交换机收到后,看看绑定的队列有没有匹配的队列,如果有,转发过去,没有消息直接丢弃
- Fanout扇出交换机:当消息到达扇出交换机时,它会将这条消息广播给所有与之绑定的队列。
- Topic主题交换机:两个概念:bindingKey:队列和交换机绑定的时候指定一个单词;routingKey生产者发送消息的时候也指定一个单词,当两者对上时,此时就可以把消息转发到对应的消息队列中
- Header消息头交换机
持久化
上面的虚拟主机,交换机,队列,绑定和消息都需要BrokerServer组织管理,此时这些概念对应的数据需要存储和管理起来,此时内存和硬盘都会存储一份,内存为主,硬盘为辅.原因如下:
对MQ来说,能够高效的转发和处理数据是非常关键的指标,因此使用内存来组织上述数据得到的效率比方到内存要高很多。硬盘上存储是为了防止内存中的数据随着进程重启/主机重启而丢失
网络通信
其他的服务器通关网络和Broker Server进行交互,那么用什么网络协议进行交互呢?使用TCP+自定义的应用层协议实现生产者/消费者和Broker Server之间交互工作,由于TCP传的是一些字节流,为了让这些字节流能够表述出一定的含义,因此自定义应用层协议辅助完成通信过程。自定义应用层协议要做的的让客户端可以通过网络调用Broker Server提供的编程接口。因此在客户端也需要提供这些方法(即上面的核心api),只不过服务器版本的方法是管理数据进行调整,客户端这边只是发送请求/解释响应.
当响应回来了,客户端的queueDeclare就会获取到这个响应,创建队列成功,此时queueDeclare执行完毕。
客户端除了提供9个核心api和服务器这边方法对应外,还需要提供四个方法支撑其他工作
- 创建Connection
- 关闭Connection
- 创建Channle
- 关闭Channle
注:
- 此处的Connection代表TCP连接
- Channle可看作通道/信道,一个Connection里面可以包含多个Channel,每个Channel上面传输的数据都是互补相干的,TCP建立和断开一个连接成本较高,因此很多时候不希望频繁的建立和断开连接,Channle比TCP连接的建立和断开轻量很多。
消息应答模式
- 自动应答:消费者把消息取走,就算应答了,传入的消息并不关键,效率更高
- 手动应答:basicAck方法属于手动应答(消费者需要主动调用这个api)一般队列消息比较重要,使用手动应答比较合适,提高了可靠性但是降低了效率