原创

RabbitMQ学习

什么是RabbitMQ

RabbitMQ是一个Erlang开发的AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的开源实现,它最初起源于金融系统

RabbitMQ的特点

  • 可靠性: RabbitMQ使用如持久化、传输确认、发布确认等机制来保证可靠性
  • 扩展性:多个RabbitMQ节点可以组成一个集群,可以动态扩展集群中的节点
  • 高可用性:队列可以在集群中的机器上设置镜像,即使部分节点出现问题队列仍然可用
  • 多种协议:AMQP协议、STOMP、MOTT等多种消息中间件协议
  • 多语言客户端:java、Python、Ruby、PHP、C#、JavaScript、Go、Object-C等
  • 管理界面:查看、监控、管理消息、集群中节点状态的用户界面
  • 第三方插件丰富

RabbitMQ的与其它MQ的对比

参考这个,待整理,还需要再参考一些其他的
https://blog.csdn.net/qq_36236890/article/details/81174504

RabbitMQ的安装

RabbitMQ交互模型图

RabbitMQ的基础概念

参考文章:
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral
https://www.sohu.com/a/158868089_355142

AMQP 协议与规范

AMQP协议是一个高级抽象层消息通信协议。
AMQP规范不仅定义了一种网络协议,同时也定义了服务端的服务与行为——AMQP模型,AMQP模型在逻辑上定义了三种抽象组件用于指定消息的路由行为。
Exchange(交换器): 用于把消息路由到队列的组件
Queue(队列): 用于存储消息的数据结构(内存或者硬盘)
Binding(绑定规则): 一套规则,用于告诉Exchange(交换器)消息应该被存储到哪个Queue(队列)
RabbitMQ是AMQP协议的实现。它主要包括以下组件:

在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层:

  1. Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。
  2. Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。
  3. Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。

Server(broker)

接受客户端连接,实现AMQP消息队列和路由功能的进程。

Virtual Host

每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和绑定规则等等。这保证了你可以在多个不同的Application中使用RabbitMQ。
其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host

Message

由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。

Connection 连接

概述:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

Channel 信道(或者叫通道)

信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。

Exchange 交换器(或者叫交换机)

概述:用于把消息路由到队列的组件。
Producer(生产者)会将消息发送给Exchange(交换器),由Exchange(交换器)将消息路由到一个或者多个Queue(队列)(或者丢弃)。因为消息是存储在Queue(队列)中,所以Exchange(交换器)的使用并不真正的消耗服务器的性能,而Queue(队列)会。

Queue 队列(或者叫列队)

概述:用于存储消息的数据结构(内存或者硬盘)

  1. Queue(队列)是RabbitMQ的内部对象,用于存储消息。RabbitMQ中的消息都只能存储在Queue(队列)中。Producer(生产者)发送的消息最终会投递到Queue(队列)中(从上图可以看出Producer并不直接与Queue直接交互),Consumer(消费者)可以从Queue(队列)中获取消息并消费。
  2. 多个Consumer(消费者)可以订阅同一个Queue(队列),此时队列中的消息会被平均分配(Round-Robin,轮询)给多个Consumer(消费者)处理,RabbitMQ不支持队列层面的广播消费。

Binding 绑定规则

概述:一套规则,用于告诉Exchange(交换器)消息应该被存储到哪个Queue(队列)
Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。

Command

AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。

ack机制

如果一个queue没被任何的Consumer Subscribe(订阅),那么,如果这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除

  什么是正确收到呢?通过ack。每个Message都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么:

  RabbitMQ Server会把这个信息发送到下一个Consumer。

如果这个app有bug,忘记了ack,那么RabbitMQ Server不会再发送数据给它,因为Server认为这个Consumer处理能力有限。

  ack的机制还可以起到限流的作用(Benefitto throttling):在Consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance Consumer的load

  比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到

RabbitMQ的Exchange(交换机)类型

主要参考的文章:
https://www.sohu.com/a/158868089_355142

fanout

fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

上图中,生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费。

direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。

topic

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

routing key为一个句点号“.”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key与routing key一样也是句点号“. ”分隔的字符串

binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

headers

headers类型的Exchange(交换器)不依赖于RoutingKey(路由键)的匹配规则来路由消息,而是根据发送的消息内容中中headers属性进行匹配。Queue与Exchange绑定时指定的key-value键值对,如果完全匹配消息发送到Exchange时消息内容中的headers属性(key-value键值对),则消息会路由到该队列。headers类型的Exchange(交换器)性能很差,一般情况下不会使用。

RabbitMQ的消息发送/接收模型

自我理解,RabbitMQ 通过交换机()和路由键()的不同组合搭配,从而有了以下六种消息发送/接收模型。
详细理解如下:
由Exchange,Queue,RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。

Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。
有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。
Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。
Fanout exchange: 会向响应的queue广播。
Topic exchange:对key进行模式匹配,比如ab可以传递到所有ab的queue。

以下总结主要参考自这两篇文章,文章里面也有相应的demo代码:
https://www.cnblogs.com/dongkuo/p/6001791.html#rpc
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral
还有参考了官网:
https://www.rabbitmq.com/tutorials/tutorial-six-java.html

简单的模型(The simplest thing that does something)- 单发送单接收

概述:简单的发送与接收,没有特别的处理。
实现:使用的是默认交换机(Default Exchange),忽略了路由键(Rounting Key)
解析:
如下图,在图中,P代表producer,它是消息的生产者;C代表consumer,它是消息的消费者;而红色的矩形正是我们所谓的消息队列,它位于RabbitMQ中(RabbitMQ中可以有很多这样的队列,并且每个队列都有一个唯一的名字)。生产者(们)可以将消息发送到消息队列中,消费者(们)可以从消息队列中取出消息。
图示:

代码或demo
https://www.cnblogs.com/dongkuo/p/6001791.html
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral

Work queues 模型(Distributing tasks among workers (the competing consumers pattern)) - 单发送多接收

概述:这种模型描述的是一个生产者(Boss)向队列发消息(任务),多个消费者(worker)从队列接受消息(任务)。
一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。

实现:使用的是默认交换机(Default Exchange),忽略了路由键(Rounting Key)
图示:

代码或demo
https://www.cnblogs.com/dongkuo/p/6001791.html
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral

Publish/Subscribe 模型 (Sending messages to many consumers at once)- 发布、订阅

概述:发布、订阅模式,发送端发送广播消息,多个接收端接收。
生产者把消息发送到了交换机(exchange)中,然后交换机负责(决定)将消息发送到(哪一个)消息队列中。
Publish/Subscribe模型是要让所有的消费者都能够接收到每一条消息。

实现:
使用的是fanout类型的交换机(fanout类型的交换机会路由每一条消息到所有和它绑定的队列,忽略路由键)。

图示:

代码或demo
https://www.cnblogs.com/dongkuo/p/6001791.html
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral

Routing 模型(Receiving messages selectively) - 按路线发送接收

概述:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。

实现:
使用的是direct类型的交换机(direct类型的交换机要求和它绑定的队列带有一个路由键K,若有一个带有路由键R的消息到达了交换机,交换机会将此消息路由到路由键K = R的队列。默认交换机便是该类型)。自定义的路由键(routing key)。但是该路由键是确定的非规则匹配的。

区分
和Publish/Subscribe模型的区别:
发送端和Publish/Subscribe模型的区别:

  1. exchange的type为direct
  2. 发送消息的时候加入了routing key
    接收端和Publish/Subscribe模型的区别:
    在绑定queue和exchange的时候使用了routing key,即从该exchange上只接收routing key指定的消息。

图示:

代码或demo
https://www.cnblogs.com/dongkuo/p/6001791.html
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral

Topics 模型(Receiving messages selectively) - 按主题发送接收

概述:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。
在Topics模型中,我们“升级”了routing key,它可以由多个关键词组成,词与词之间由点号(.)隔开。特别地,规定*表示任意的一个词;#号表示任意的0个或多个词。

实现:
使用的是 topic类型的交换机,路由键是规则匹配的路由键。

区分
和Routing模型的区别:
发送端和Routing模型的区别:

  1. exchange的type为topic
  2. 发送消息的routing key不是固定的单词,而是匹配字符串,如".lu.#",匹配一个单词,#匹配0个或多个单词。
    接收端和Routing模型的区别:
  3. exchange的type为topic
  4. 接收消息的routing key不是固定的单词,而是匹配字符串。

图示:

代码或demo
https://www.cnblogs.com/dongkuo/p/6001791.html
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral

RPC模型 (Request/Reply pattern) - 请求回复方式

这种模型是用来做RPC(Remote procedure call, 远程程序调用)的。
想要了解更多细节,可参考官网的说明:https://www.rabbitmq.com/tutorials/tutorial-six-java.html

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。
在RabbitMQ中也支持RPC。
图示:

RabbitMQ中实现RPC的机制是:

客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)

服务器端收到消息并处理

服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性

客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

代码或demo
https://www.cnblogs.com/dongkuo/p/6001791.html
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral

RabbitMQ的消息属性和参数

可参考这个,待整理,还要再参考一些其它的
https://blog.51cto.com/hmtk520/2050847
https://www.cnblogs.com/qixuejia/p/5997054.html
https://blog.csdn.net/vbirdbest/article/details/78670550

RabbitMQ的管理界面怎么使用

主要参考文章:https://blog.csdn.net/t610654893/article/details/82982392

如何登录 RabbitMQ 的管理界面

首先我们来访问下本机(windows)安装后,查看管理界面http://localhost:15672 账户/密码:guest/guest (15672是默认web界面访问接口,guest/guest账户密码亦是默认的账户密码.)
登录之后我们可以看到如下的界面

界面解析

首页进去是一个TOP横幅,上面是RabbitMQ的版本和Erlang版本信息。

overview(概述)

如图:

解析:记录一些本地rabbit的磁盘信息和内存占用,而后是一个监听端口,意思是协议的端口.
这里我们需要记下amqp协议的端口5672.因为rabbitmq就是遵从amqp协议的erlang语言实现的消息列队框架技术.也就是我们需要在rabbitmq开发和实现时,必须要用此端口来进行连接。

Connections (连接)

如图:

解析:
连接界面此时是空的,看似什么也没有,但是通过词意大概可以猜想到,这个界面记录应该是展示客户端或者某个服务连接到此rabbitmq的信息展示,那么通过什么连接呢?
IP?用户名?密码?还有端口,前3个都有了,那么后面的端口呢,是不是概述里面的监听协议端口.显然是的.
刚入门之初,我一直以为是15672,事实证明是错的,5672才是正确的协议连接端口.
为了查看方便,我后台写了简单的写了一个连接程序,打开看看,代码之后会乘上,现在带着好奇心去往下看和理解。
有了连接信息之后,界面显示如下:

Channels 通道(或者叫信道)

如图:

解析:
信道(或者叫通道)抽象理解一下,就是在创建连接的时候,我们还需要给一个信道,来承载信息的传输.好比一根电线,当它通电后,其实是其内的铜线来传输电力一样。
有内容的 Channels显示如下:

Exchanges 交换机(或者叫交换器)

如图:

解析:
交换器的理解就是在第三步创建信道后,信道上需要设置一个交换器.那么用交换器去做什么呢,往下看.
界面一个列表,Name列就是名称,Type是交换器类型,既然类型那么交换器肯定通过类型会做不同的逻辑处理.还有一个Features特性列,证明在我们实现代码的时候也需要如此定位类型.
D表示(durable(持久化的)),I表示(internal(内在的))
词面理解交换器可以是持久化的也可以定义为内部内在的,外部无法访问,只能通过交换器之间来访问
接下来点开某个交换器看下。

由图可以发现,交换器内工作的原理:
4.1 绑定queue(列队)Routing key(路由) Arguments(参数)
4.2 发布信息Routing key(路由) ... 证明这个Routing key(路由key)也是至关重要的.
交换器绑定通过路由key绑定列队进行消息发布。

Queues 队列

如图:

解析:
队列用于存储消息.等待消息被接收。
在理论层面上来看,以上都是生产者需要做的工作。

总结

  1. 生产者通过amqp协议连接RabbitMQ,建立连接。
  2. 连接上创建通道。
  3. 通道内创建不同类型的交换器和消息队列,交换器通过Routing key绑定消息队列,并且发布消息存储在队列内。
  4. 消费者获取连接和通道,在通道内通过消息队列标识(QueueName)启动一个消费者,消费者消费消息。

体验 RabbitMQ 功能的一些代码和demo

几种模型的收发的例子代码,非常精品和清晰:
https://www.cnblogs.com/dongkuo/p/6001791.html#rpc
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral

生产者和消费者的简单代码,但是好像跑不起来,仅供参考
https://blog.csdn.net/t610654893/article/details/82982392

参考文章

Rabbitmq的相关概念参考的最多的一篇。模型图,基础概念如通道、队列。Exchange(交换机)类型 啥的那些的参考
https://blog.csdn.net/usagoole/article/details/82831812#RabbitMQ_7

这篇也是参考的比较多,比如AMQP协议模型,还有很多基础的概念,message,body headers这些。还有收发应用的代码,还有RabbitMQ的使用场景。
https://www.cnblogs.com/luxiaoxun/p/3918054.html?utm_source=tuicool&utm_medium=referral

RabbitMQ 的管理界面的内容参考文章
https://blog.csdn.net/t610654893/article/details/82982392

生产者和消费者的简单代码,但是好像跑不起来,仅供参考
https://blog.csdn.net/t610654893/article/details/82982392

其它参考文章:
基础概念,ack机制
https://www.cnblogs.com/rcjtom/articles/5545884.html

Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。
有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。
还有RPC的意思的参考。
https://www.sohu.com/a/158868089_355142

正文到此结束
相关文章
本文目录