RabbitMQ

MQ简介

MQ全称Message Queue(消息队列),是在消息的传输过程中保存新消息的容器,多用于分布式系统之间进行通信

 

生产者 → 中间件(MQ) → 消费者

 

分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信

发送方称为生产者,接收方称为消费者

 

MQ的优势和劣势

优势

应用解耦

 

异步提速

 

削峰填谷

 

劣势

系统可用性降低

 

系统复杂度提高

 

一致性问题

 

使用MQ的前提

生产者不需要从消费者处获得反馈

引入消息列队之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成功动作做完了继续

容许短暂的不一致性

 

确实用了有效果

即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本

 

 

常见的MQ产品

目前业界有很多的MQ产品,例如RabbitMQ,RocketMQ,ActiveMQ,Kafka,ZeroMQ,MetaMQ等,也有直接使用Redis充当消息队列的案例,这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑

 

 

 RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

 

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用层的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制,2006年,AMQP规范发布,类比Http

 

RabbitMQ基础架构

 

Broker

接收和分发消息的应用,RabbitMQ Server就是 Message Broker

 

Virtual host

出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建exchange / queue等

 

Connection

publisher / consumer 和broker之间的TCP连接

 

Channel

如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connectio的开销将是巨大的,效率也较低,Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker 识别channel,所以 channel之间是完全隔离的,Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销

在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的

 

Exchange

message到达 broker的第一站,根据分发规则,匹配查询表中的RoutingKey,分发消息到queue中去,常用的交换机类型有: 路由direct (point-to-point), 通配符topic(publish-subscribe) and 订阅fanout (multicast)

 

Queue

消息最终被送到这里等待consumer取走

 

Binding

exchange和queue之间的虚拟连接,binding 中可以包含 routing key,Binding信息被保存到exchange 中的查询表中,用于message 的分发依据

 

 

JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API

JMS是JavaEE规范中的一种,类比JDBC

很多消息中间件都实现了JMS规范,例如:ActiveMQ

RabbitMQ官方没有提供JMS的实现包,但是开源社区有

 

rabbitMQ官网:http://www.rabbitmq.com/

 

 

部署rabbitmq

使用docker拉取镜像

 

运行rabbitmq

 

本机访问控制台 虚拟机ip:15672

 

引入rabbitmq依赖坐标

 

简单PQC模式

P:生产者,也就是要发送消息的程序

Q:消息队列,类似一个邮箱,可以缓存消息,生产者向其中投递消息,消费者从其中取出消息

C:消费者:消息的接收者,会一直等待消息到来

 

生产者代码实现

1、创建连接工厂

 

2、设置参数

 

3、创建连接 Connection

 

4、创建 Channel

 

5、创建列队Queue

queueDeclare函数

用于消息队列

参数:

 

6、发送消息

basicPublish函数

用于生产者发送消息给交换机,并且对消息进行一些设置

参数:

 

7、释放资源

 

 

消费者代码实现

1、创建连接工厂,设置参数

 

2、创建Connection和Channel

 

3、接收消息

 

RabbitMQ的工作模式

工作队列模式

 

img

Work queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息

应用场景: 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

 

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系

Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

例如:短信服务部署多个,只需要有一个节点成功发送即可

默认采用轮询方式分发消息给消费者

 

生产者发送10次消息

 

消费者监听消息列队

 

 

Pub/Sub订阅模式

 

 

在订阅模型中,多了一个Exchange角色,而且过程略有变化:

Exchange有常见以下3种类型:

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失

 

exchangeDeclare函数

用于声明一个交换机

参数:

 

生产者代码实现

 

routingKey的作用

交换机根据消息携带的路由键,来决定该消息递交给哪个队列,交换机根据路由键提供的绑定规则来递交消息后,消费者就可以连接队列获取消息

首先路由键需要用在在交换机和队列创建之后的相互绑定

其次发布数据的时候需要在某个交换机里面填写路由键,然后写上要发送的消息内容

最后路由键就可以找到被绑定的相应的队列来接收消息

 

消费者代码实现

 

 

Routing路由模式

 

 

 

生产者代码实现

 

消费者代码实现

 

 

 

 

Topics通配符模式

 

通配符:

*号代表恰好匹配1个单词 能匹配topic.hello

#号代表匹配0个或多个单词 能匹配topic.hello.world

 

例如:routingKey为 系统的名称.日志的级别

需求:将所有error级别的日志存入数据库,所有的order系统的日志存入数据库

 

spring整合rabbitMQ

 

生产者

 

消费者

 

 

生产者实现

 

引入maven坐标

 

 

创建配置信息文件

 

 

配置spring核心配置文件(生产者)

 

 

在单元测试中测试

 

测试topic类型消息

 

 

消费者实现

导入maven坐标

 

编辑消费者消息监听器

 

 

配置spring核心配置文件(消费者)

 

 

springboot整合rabbitMQ

生产者实现

引入坐标

 

配置启动类配置文件

 

配置核心配置类

1、需要配置一个交换机

2、需要声明一个队列

3、配置交换机和队列绑定关系

 

测试生产者发送消息

 

消费者实现

创建springboot工程导入maven坐标

 

配置启动类配置文件

 

创建静态类

使用@Component将类添加为spring组件

配置一个监听器

使用@RabbitListener注解

指定消费者监听器监听的队列名称

 

总结