RabbitMQ
MQ简介
MQ全称Message Queue(消息队列),是在消息的传输过程中保存新消息的容器,多用于分布式系统之间进行通信
生产者 → 中间件(MQ) → 消费者
分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
发送方称为生产者,接收方称为消费者
同步调用存在的问题
- 耦合度高,加入新需求需要修改原来的代码,违背了开闭原则
- 性能下降,调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
- 资源浪费,调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
- 级联失败,如果服务提供者出现问题,所有调用方也会跟着出现问题,会迅速导致整个微服务群的故障
异步调用方案
异步调用常见实现就是事件驱动模式
MQ的优势和劣势
优势
应用解耦
- 系统的耦合性越高,容错性就越低,可维护性就越低,使用MQ使得应用间解耦,提升容错性和可维护性
- MQ存放生产者提供的信息,传递给需要的消费者,降低了生产者和消费者之间直接远程调用产生的耦合
异步提速
- 系统只需要向MQ发送本次调用所需要的信息而无需等待返回结果
- 提升了用户体验和系统吞吐量(单位时间内处理请求的数目)
削峰填谷
- 削峰指的是在一段时间内出现的用户访问量激增,将请求消息积压在MQ中来缓解服务的压力
- 填谷指的是将高峰期后的一段时间内对MQ内积压的请求消息进行稳定处理
- 提高了系统的稳定性
劣势
系统可用性降低
- 系统引入的外部依赖越多,系统稳定性越差,一旦MQ宕机,就会对业务造成影响
系统复杂度提高
- MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用
一致性问题
- A系统处理完业务,通过MQ给B、C、D三个系统发送消息数据,如果B系统,C系统处理成功,D系统处理失败,如何保证消息数据处理的一致性?
使用MQ的前提
生产者不需要从消费者处获得反馈
引入消息列队之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成功动作做完了继续
容许短暂的不一致性
确实用了有效果
即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本
常见的MQ产品
目前业界有很多的MQ产品,例如RabbitMQ,RocketMQ,ActiveMQ,Kafka,ZeroMQ,MetaMQ等,也有直接使用Redis充当消息队列的案例,这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ简介
AMQP,即Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用层的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制,2006年,AMQP规范发布,类比Http
RabbitMQ基础架构
Broker
接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建exchange / queue等
Connection
publisher / consumer 和broker之间的TCP连接
Channel
如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connectio的开销将是巨大的,效率也较低,Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker 识别channel,所以 channel之间是完全隔离的,Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的
Exchange
message到达 broker的第一站,根据分发规则,匹配查询表中的RoutingKey,分发消息到queue中去,常用的交换机类型有: 路由direct (point-to-point), 通配符topic(publish-subscribe) and 订阅fanout (multicast)
Queue
消息最终被送到这里等待consumer取走
Binding
exchange和queue之间的虚拟连接,binding 中可以包含 routing key,Binding信息被保存到exchange 中的查询表中,用于message 的分发依据
JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API
JMS是JavaEE规范中的一种,类比JDBC
很多消息中间件都实现了JMS规范,例如:ActiveMQ
RabbitMQ官方没有提供JMS的实现包,但是开源社区有
rabbitMQ官网:http://www.rabbitmq.com/
部署rabbitmq
使用docker拉取镜像
docker pull rabbitmq:3-management
运行rabbitmq
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
本机访问控制台 虚拟机ip:15672
引入rabbitmq依赖坐标
<!--rabbitmq java 客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
简单PQC模式
P:生产者,也就是要发送消息的程序
Q:消息队列,类似一个邮箱,可以缓存消息,生产者向其中投递消息,消费者从其中取出消息
C:消费者:消息的接收者,会一直等待消息到来
生产者代码实现
1、创建连接工厂
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
2、设置参数
//设置参数
//设置主机ip和端口,默认为localhost:5672
factory.setHost("192.168.119.88");
factory.setPort(5672);
//设置虚拟机名
factory.setVirtualHost("/");
//设置用户名和密码
factory.setUsername("root");
factory.setPassword("123456");
3、创建连接 Connection
//创建连接 Connection
Connection connection = factory.newConnection();
4、创建 Channel
//创建channel
Channel channel = connection.createChannel();
5、创建列队Queue
queueDeclare函数
用于消息队列
参数:
- String queue:队列名称
- boolean durable:是否持久化,当mq重启之后是否存在
- boolean exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列
- boolean autoDelete:是否自动删除,当没有Consumer时,自动删除掉
- Map<String, Object> arguments:参数消息
/*
参数:
1、String queue:队列名称
2、boolean durable:是否持久化,当mq重启之后,还在
3、boolean exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列
4、boolean autoDelete:是否自动删除,当没有Consumer时,自动删除掉
5、Map<String, Object> arguments:参数消息
*/
//如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建
channel.queueDeclare("hello world",true,false,false,null);
6、发送消息
basicPublish函数
用于生产者发送消息给交换机,并且对消息进行一些设置
参数:
- String exchange:交换机的名称,简单模式下交换机会使用默认的
- String routingKey:路由名称,没有指定交换机时可以是队列名称
- BasicProperties props:配置信息
- byte[] body:发送消息数据
/*
参数:
1、String exchange:交换机的名称,简单模式下交换机会使用默认的
2、String routingKey:路由名称,没有指定交换机时可以是队列名称
3、BasicProperties props:配置信息
4、byte[] body:发送消息数据
*/
//发送消息
String body = "Hello rabbitMQ!";
channel.basicPublish("","hello world",null,body.getBytes());
7、释放资源
//释放资源
channel.close();
connection.close();
消费者代码实现
1、创建连接工厂,设置参数
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
//设置主机ip和端口,默认为localhost:5672
factory.setHost("192.168.119.88");
factory.setPort(5672);
//设置虚拟机名
factory.setVirtualHost("/");
//设置用户名和密码
factory.setUsername("root");
factory.setPassword("123456");
2、创建Connection和Channel
//创建连接 Connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建列队
channel.queueDeclare("hello world",true,false,false,null);
3、接收消息
/*
参数:
1、queue:队列名称
2、autoAck:是否自动确认
3、callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息(交换机,路由key...)
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
//接收消息
channel.basicConsume("hello world",true,consumer);
RabbitMQ的工作模式
工作队列模式
Work queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息
应用场景: 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
例如:短信服务部署多个,只需要有一个节点成功发送即可
默认采用轮询方式分发消息给消费者
生产者发送10次消息
/*
参数:
1、queue:队列名称
2、durable:是否持久化,当mq重启之后,还在
3、exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列
4、autoDelete:是否自动删除,当没有Consumer时,自动删除掉
5、arguments:参数消息
*/
//如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
参数:
1、exchange:交换机的名称,简单模式下交换机会使用默认的
2、routingKey:路由名称
3、props:配置信息
4、body:发送消息数据
*/
//发送消息
for (int i = 0; i < 10; i++) {
String body = i+":Hello rabbitMQ!";
channel.basicPublish("","work_queues",null,body.getBytes());
}
消费者监听消息列队
/*
参数:
1、queue:队列名称
2、durable:是否持久化,当mq重启之后,还在
3、exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列
4、autoDelete:是否自动删除,当没有Consumer时,自动删除掉
5、arguments:参数消息
*/
//如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建
channel.queueDeclare("work_queues",true,false,false,null);
/*
参数:
1、queue:队列名称
2、autoAck:是否自动确认
3、callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息(交换机,路由key...)
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
//接收消息
channel.basicConsume("work_queues",true,consumer);
}
Pub/Sub订阅模式
在订阅模型中,多了一个Exchange角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机 (X),一方面,接收生产者发送的消息,另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃,到底如何操作,取决于Exchange的类型
Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失
exchangeDeclare函数
用于声明一个交换机
参数:
String exchange 交换机名称
BuiltinExchangeType type 交换机类型
- DIRECT(“direct”):定向
- FANOUT(“fanout”):扇形(广播),发送消息到每个与之绑定的队列
- TOPIC(“topic”):通配符的方式
- HEADERS(“headers”):参数匹配
boolean durable 是否持久化
boolean autoDelete 自动删除
boolean internal 内部使用,一般为false
Map<String,Object> arguments 参数
生产者代码实现
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
//设置主机ip和端口,默认为localhost:5672
factory.setHost("192.168.119.88");
factory.setPort(5672);
//设置虚拟机名
factory.setVirtualHost("/");
//设置用户名和密码
factory.setUsername("root");
factory.setPassword("123456");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
/*
参数:
1、String exchange 交换机名称
2、BuiltinExchangeType type 交换机类型
DIRECT("direct"):定向
FANOUT("fanout"):扇形(广播),发送消息到每个与之绑定的队列
TOPIC("topic"):通配符的方式
HEADERS("headers"):参数匹配
3、boolean durable 是否持久化
4、boolean autoDelete 自动删除
5、boolean internal 内部使用,一般为false
6、Map<String,Object> arguments 参数
*/
String exchangeName = "test_fanout";
//创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
/*
参数:
1、String queue 队列名称
2、String exchange 交换机名称
3、String routingKey 路由键,绑定规则
如果交换机的类型为fanout,routingKey设置为""
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
routingKey的作用
交换机根据消息携带的路由键,来决定该消息递交给哪个队列,交换机根据路由键提供的绑定规则来递交消息后,消费者就可以连接队列获取消息
- 单播(PQC)模式根据路由键名称指定消息队列
- 发布订阅模式无需路由键,将消息发送给对应绑定了交换器的所有消息队列
- topic模式是根据路由键的规则匹配,有选择性的进行广播
首先路由键需要用在在交换机和队列创建之后的相互绑定
其次发布数据的时候需要在某个交换机里面填写路由键,然后写上要发送的消息内容
最后路由键就可以找到被绑定的相应的队列来接收消息
消费者代码实现
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
//设置主机ip和端口,默认为localhost:5672
factory.setHost("192.168.119.88");
factory.setPort(5672);
//设置虚拟机名
factory.setVirtualHost("/");
//设置用户名和密码
factory.setUsername("root");
factory.setPassword("123456");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
/*
参数:
1、queue:队列名称
2、autoAck:是否自动确认
3、callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息(交换机,路由key...)
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("将日志信息打印到控制台...");
}
};
//接收消息,绑定一条列队
channel.basicConsume("test_fanout_queue1",true,consumer);
Routing路由模式
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的Routingkey完全一致,才会接收到消息,此时交换机的类型应该是DIRECT
生产者代码实现
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
//设置主机ip和端口,默认为localhost:5672
factory.setHost("192.168.119.88");
factory.setPort(5672);
//设置虚拟机名
factory.setVirtualHost("/");
//设置用户名和密码
factory.setUsername("root");
factory.setPassword("123456");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
/*
参数:
1、String exchange 交换机名称
2、BuiltinExchangeType type 交换机类型
DIRECT("direct"):定向
FANOUT("fanout"):扇形(广播),发送消息到每个与之绑定的队列
TOPIC("topic"):通配符的方式
HEADERS("headers"):参数匹配
3、boolean durable 是否持久化
4、boolean autoDelete 自动删除
5、boolean internal 内部使用,一般为false
6、Map<String,Object> arguments 参数
*/
String exchangeName = "test_direct";
//创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
/*
参数:
1、String queue 队列名称
2、String exchange 交换机名称
3、String routingKey 路由键,绑定规则
*/
//队列1的绑定
channel.queueBind(queue1Name,exchangeName,"error");
//队列2的绑定
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
channel.queueBind(queue2Name,exchangeName,"info");
String body = "日志信息,张三调用了findAll方法...日志级别:info...";
String body2 = "错误信息,数据出现错误...级别:error...";
//发送消息info
channel.basicPublish(exchangeName,"info",null,body.getBytes());
channel.basicPublish(exchangeName,"error",null,body2.getBytes());
//释放资源
channel.close();
connection.close();
消费者代码实现
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
//设置主机ip和端口,默认为localhost:5672
factory.setHost("192.168.119.88");
factory.setPort(5672);
//设置虚拟机名
factory.setVirtualHost("/");
//设置用户名和密码
factory.setUsername("root");
factory.setPassword("123456");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
/*
参数:
1、queue:队列名称
2、autoAck:是否自动确认
3、callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会自动执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息(交换机,路由key...)
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("将error信息保存到数据库...");
}
};
//接收消息
channel.basicConsume("test_direct_queue1",true,consumer);
Topics通配符模式
通配符:
*****号代表恰好匹配1个单词 能匹配topic.hello
**#**号代表匹配0个或多个单词 能匹配topic.hello.world
例如:routingKey为 系统的名称.日志的级别
需求:将所有error级别的日志存入数据库,所有的order系统的日志存入数据库
什么是SpringAMQP
AMQP (Advanced Message Queuing Protocol),是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
Spring AMQP 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息
包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现
springAMQP
特点
- 监听器容器,用于异步处理入站消息
- 用于发送和接收消息的RabbitTemplate
- RabbitAdmin用于自动声明队列,交换和绑定
spring整合rabbitMQ
生产者
- 创建生产者工程
- 添加依赖
- 配置整合
- 编写代码发送消息
消费者
- 创建生产者工程
- 添加依赖
- 配置整合
- 编写消息监听器
生产者实现
引入maven坐标
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
</dependencies>
创建配置信息文件
rabbitmq.host=192.168.119.88
rabbitmq.port=5672
rabbitmq.username=root
rabbitmq.password=123456
rabbitmq.virtual-host=/
配置spring核心配置文件(生产者)
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--引入配置信息-->
<context:property-placeholder location="classpath:rabbitmq.properties"></context:property-placeholder>
<!--定义rabbitmq connectionFactory-->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定义持久化队列,不存在则自动创建,不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为"",路由键为队列的名称
id:bean的名称
name:queue的名称
auto-declare:自动创建
auto-delete:自动删除,最后一个消费者和该队列断开连接后,自动删除队列
durable:是否持久化true/false
exclusive:是否独占连接
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!--广播类型队列,所有队列都能收到消息,不存在自动创建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定义广播类型交换机,并绑定上述两个队列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"></rabbit:binding>
<rabbit:binding queue="spring_fanout_queue_2"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--通配符类型队列,*匹配一个单词,#匹配多个单词,不存在则自动创建-->
<rabbit:queue id="spring_topic_queue_1" name="spring_topic_queue_1" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_2" name="spring_topic_queue_2" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_3" name="spring_topic_queue_3" auto-declare="true"/>
<!--创建Topic类型交换机,如果不存在则自动创建,并且将上面三个队列与该交换机绑定-->
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="os467.*" queue="spring_topic_queue_1"/>
<rabbit:binding pattern="os467.#" queue="spring_topic_queue_2"/>
<rabbit:binding pattern="com.os467.#" queue="spring_topic_queue_3"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"></rabbit:template>
</beans>
在单元测试中测试
package com.os467.rabbitmq_pro_test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring_rabbitmq_pro.xml")
public class MqProTest {
//注入rabbitmq模板对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHelloWorld(){
//发送消息
rabbitTemplate.convertAndSend("spring_queue","testing spring-rabbitmq...");
}
}
测试topic类型消息
@Test
public void testFanoutQueue(){
rabbitTemplate.convertAndSend("spring_topic_exchange","os467.test.msg","this is topic mode!");
}
消费者实现
导入maven坐标
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
</dependencies>
编辑消费者消息监听器
需要继承MessageListener接口
重写
onMessage()
方法
package com.os467.rabbitmq_con;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringQueueListener implements MessageListener {
/**
* 监听器,回调方法
* @param message
*/
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
配置spring核心配置文件(消费者)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<bean id="springQueueListener" class="com.os467.rabbitmq_con.SpringQueueListener"></bean>
<!--配置消息监听器-->
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
</rabbit:listener-container>
</beans>
springboot整合rabbitMQ
生产者实现
引入坐标
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--springboot整合rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
配置启动类配置文件
spring.rabbitmq.host=192.168.119.88
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
配置核心配置类
1、需要配置一个交换机
2、需要声明一个队列
3、配置交换机和队列绑定关系
package com.os467.demo.rabbitmq.config;
import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String TOPIC_EXCHANGE_NAME = "boot_topic_exchange";
public static final String TOPIC_QUEUE_NAME = "boot_topic_queue";
//1、交换机
@Bean("bootExchange")
public Exchange bootExchange(){
//ExchangeBuilder创建交换机的类,durable是否持久化,build()创建交换机
return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();
}
//2、Queue队列
@Bean("bootQueue")
public Queue bootQueue(){
//队列声明
return QueueBuilder.durable(TOPIC_QUEUE_NAME).build();
}
//3、队列和交换机绑定关系 Binding
/*
1、知道哪个队列
2、知道哪个交换机
3、routing key
*/
@Bean
public Binding bindingQueueExchange(Exchange bootExchange,Queue bootQueue){
return BindingBuilder.bind(bootQueue).to(bootExchange).with("os467.#").noargs();
}
}
测试生产者发送消息
package com.os467.demo.rabbitmq;
import com.os467.demo.rabbitmq.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class ProducerTest {
//注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
//测试生产者发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,"os467.testTopic","boot mq testing...");
}
}
消费者实现
创建springboot工程导入maven坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置启动类配置文件
spring.rabbitmq.host=192.168.119.88
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
创建静态类
使用**@Component**将类添加为spring组件
配置一个监听器
使用**@RabbitListener注解**
指定消费者监听器监听的队列名称
package com.os467.demo.rabbitmq_listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_topic_queue")
public void ListenerQueue(Message message){
System.out.println(message);
}
}
总结
- Springboot提供了快速整合RabbitMQ的方式
- 基本信息只需要在properties中配置即可,交换机队列以及绑定关系在核心配置类中用**@Bean**配置
- 生产者注入RabbitTemplate完成消息发送
- 消费者使用RabbitListener完成消息接收
测试发送Object类型数据
创建队列
@Bean
public Queue objectQueue(){
return new Queue("object.queue");
}
测试发送消息
@Test
public void testSendObjectQueue(){
Map<String,Object> msg = new HashMap<>();
msg.put("name","tom");
msg.put("age",21);
rabbitTemplate.convertAndSend("object.queue",msg);
}
- Spring对消息对象的处理是由
org.springframework.amqp.support.converter.MessageConverter
来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化 - 而这种序列化生成的数据较长而且容易发生注入问题,因此不推荐
- 如果需要修改需要定义一个MessageConverter类型的Bean,推荐用JSON方式序列化
消息转化器
需要在消费者和生产者服务中配置
在服务中引入jackson核心依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
在服务中声明MessageConverter(amqp包下的)
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
SpringAMQP中消息的序列化和反序列化如何实现?
- 利用MessageConverter实现的,默认是JDK的序列化
- 注意发送方与接收方必须使用相同的MessageConverter
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以邮件至 1300452403@qq.com