MQ全称Message Queue(消息队列),是在消息的传输过程中保存新消息的容器,多用于分布式系统之间进行通信
生产者 → 中间件(MQ) → 消费者
分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
发送方称为生产者,接收方称为消费者
优势
应用解耦
异步提速
削峰填谷
劣势
系统可用性降低
系统复杂度提高
一致性问题
生产者不需要从消费者处获得反馈
引入消息列队之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成功动作做完了继续
容许短暂的不一致性
确实用了有效果
即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本
目前业界有很多的MQ产品,例如RabbitMQ,RocketMQ,ActiveMQ,Kafka,ZeroMQ,MetaMQ等,也有直接使用Redis充当消息队列的案例,这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
|---|---|---|---|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&Java |
| 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |
AMQP,即Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用层的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制,2006年,AMQP规范发布,类比Http
RabbitMQ基础架构

Broker
接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建exchange / queue等
Connection
publisher / consumer 和broker之间的TCP连接
Channel
如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connectio的开销将是巨大的,效率也较低,Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker 识别channel,所以 channel之间是完全隔离的,Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的
Exchange
message到达 broker的第一站,根据分发规则,匹配查询表中的RoutingKey,分发消息到queue中去,常用的交换机类型有: 路由direct (point-to-point), 通配符topic(publish-subscribe) and 订阅fanout (multicast)
Queue
消息最终被送到这里等待consumer取走
Binding
exchange和queue之间的虚拟连接,binding 中可以包含 routing key,Binding信息被保存到exchange 中的查询表中,用于message 的分发依据
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API
JMS是JavaEE规范中的一种,类比JDBC
很多消息中间件都实现了JMS规范,例如:ActiveMQ
RabbitMQ官方没有提供JMS的实现包,但是开源社区有
rabbitMQ官网:http://www.rabbitmq.com/
使用docker拉取镜像
1docker pull rabbitmq:3-management
运行rabbitmq
xxxxxxxxxx91docker run \2 -e RABBITMQ_DEFAULT_USER=root \3 -e RABBITMQ_DEFAULT_PASS=123456 \4 --name mq \5 --hostname mq1 \6 -p 15672:15672 \7 -p 5672:5672 \8 -d \9 rabbitmq:3-management
本机访问控制台 虚拟机ip:15672
引入rabbitmq依赖坐标
xxxxxxxxxx61<!--rabbitmq java 客户端-->2<dependency>3 <groupId>com.rabbitmq</groupId>4 <artifactId>amqp-client</artifactId>5 <version>5.4.3</version>6</dependency>
P:生产者,也就是要发送消息的程序
Q:消息队列,类似一个邮箱,可以缓存消息,生产者向其中投递消息,消费者从其中取出消息
C:消费者:消息的接收者,会一直等待消息到来
1、创建连接工厂
xxxxxxxxxx21//创建连接工厂2ConnectionFactory factory = new ConnectionFactory();
2、设置参数
xxxxxxxxxx91//设置参数2//设置主机ip和端口,默认为localhost:56723factory.setHost("192.168.119.88");4factory.setPort(5672);5//设置虚拟机名6factory.setVirtualHost("/");7//设置用户名和密码8factory.setUsername("root");9factory.setPassword("123456");
3、创建连接 Connection
xxxxxxxxxx21//创建连接 Connection2Connection connection = factory.newConnection();
4、创建 Channel
xxxxxxxxxx21//创建channel2Channel channel = connection.createChannel();
5、创建列队Queue
用于消息队列
参数:
xxxxxxxxxx101/*2 参数:3 1、String queue:队列名称4 2、boolean durable:是否持久化,当mq重启之后,还在5 3、boolean exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列6 4、boolean autoDelete:是否自动删除,当没有Consumer时,自动删除掉7 5、Map<String, Object> arguments:参数消息8 */9//如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建10channel.queueDeclare("hello world",true,false,false,null);
6、发送消息
用于生产者发送消息给交换机,并且对消息进行一些设置
参数:
x1/*2 参数:3 1、String exchange:交换机的名称,简单模式下交换机会使用默认的4 2、String routingKey:路由名称,没有指定交换机时可以是队列名称5 3、BasicProperties props:配置信息6 4、byte[] body:发送消息数据7 */8//发送消息9
10String body = "Hello rabbitMQ!";11
12channel.basicPublish("","hello world",null,body.getBytes());
7、释放资源
xxxxxxxxxx31//释放资源2channel.close();3connection.close();
1、创建连接工厂,设置参数
xxxxxxxxxx121//创建连接工厂2ConnectionFactory factory = new ConnectionFactory();3
4//设置参数5//设置主机ip和端口,默认为localhost:56726factory.setHost("192.168.119.88");7factory.setPort(5672);8//设置虚拟机名9factory.setVirtualHost("/");10//设置用户名和密码11factory.setUsername("root");12factory.setPassword("123456");
2、创建Connection和Channel
xxxxxxxxxx91//创建连接 Connection2Connection connection = factory.newConnection();3
4//创建channel5Channel channel = connection.createChannel();6
7//创建列队8channel.queueDeclare("hello world",true,false,false,null);9
3、接收消息
xxxxxxxxxx291/*2 参数:3 1、queue:队列名称4 2、autoAck:是否自动确认5 3、callback:回调对象6 */7Consumer consumer = new DefaultConsumer(channel){8 /**9 * 回调方法,当收到消息后,会自动执行该方法10 * @param consumerTag 标识11 * @param envelope 获取一些信息(交换机,路由key...)12 * @param properties 配置信息13 * @param body 数据14 * @throws IOException15 */16 17 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {18
19 System.out.println("consumerTag:"+consumerTag);20 System.out.println("Exchange:"+envelope.getExchange());21 System.out.println("RoutingKey:"+envelope.getRoutingKey());22 System.out.println("properties:"+properties);23 System.out.println("body:"+new String(body));24
25 }26};27
28//接收消息29channel.basicConsume("hello world",true,consumer);

Work queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息
应用场景: 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
例如:短信服务部署多个,只需要有一个节点成功发送即可
默认采用轮询方式分发消息给消费者
生产者发送10次消息
xxxxxxxxxx271/*2 参数:3 1、queue:队列名称4 2、durable:是否持久化,当mq重启之后,还在5 3、exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列6 4、autoDelete:是否自动删除,当没有Consumer时,自动删除掉7 5、arguments:参数消息8 */9//如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建10channel.queueDeclare("work_queues",true,false,false,null);11
12/*13 参数:14 1、exchange:交换机的名称,简单模式下交换机会使用默认的15 2、routingKey:路由名称16 3、props:配置信息17 4、body:发送消息数据18 */19//发送消息20for (int i = 0; i < 10; i++) {21
22 String body = i+":Hello rabbitMQ!";23
24 channel.basicPublish("","work_queues",null,body.getBytes());25
26
27}
消费者监听消息列队
xxxxxxxxxx381 /*2 参数:3 1、queue:队列名称4 2、durable:是否持久化,当mq重启之后,还在5 3、exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列6 4、autoDelete:是否自动删除,当没有Consumer时,自动删除掉7 5、arguments:参数消息8 */9 //如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建10 channel.queueDeclare("work_queues",true,false,false,null);11
12 /*13 参数:14 1、queue:队列名称15 2、autoAck:是否自动确认16 3、callback:回调对象17 */18 Consumer consumer = new DefaultConsumer(channel){19 /**20 * 回调方法,当收到消息后,会自动执行该方法21 * @param consumerTag 标识22 * @param envelope 获取一些信息(交换机,路由key...)23 * @param properties 配置信息24 * @param body 数据25 * @throws IOException26 */27 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {29
30 System.out.println("body:"+new String(body));31
32 }33 };34
35 //接收消息36 channel.basicConsume("work_queues",true,consumer);37
38}

在订阅模型中,多了一个Exchange角色,而且过程略有变化:
Exchange有常见以下3种类型:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失
用于声明一个交换机
参数:
String exchange 交换机名称
BuiltinExchangeType type 交换机类型
boolean durable 是否持久化
boolean autoDelete 自动删除
boolean internal 内部使用,一般为false
Map<String,Object> arguments 参数
生产者代码实现
xxxxxxxxxx581//创建连接工厂2ConnectionFactory factory = new ConnectionFactory();3
4//设置参数5//设置主机ip和端口,默认为localhost:56726factory.setHost("192.168.119.88");7factory.setPort(5672);8//设置虚拟机名9factory.setVirtualHost("/");10//设置用户名和密码11factory.setUsername("root");12factory.setPassword("123456");13
14
15
16//创建连接 Connection17Connection connection = factory.newConnection();18
19//创建channel20Channel channel = connection.createChannel();21
22/*23 参数:24 1、String exchange 交换机名称25 2、BuiltinExchangeType type 交换机类型26 DIRECT("direct"):定向27 FANOUT("fanout"):扇形(广播),发送消息到每个与之绑定的队列28 TOPIC("topic"):通配符的方式29 HEADERS("headers"):参数匹配30
31 3、boolean durable 是否持久化32 4、boolean autoDelete 自动删除33 5、boolean internal 内部使用,一般为false34 6、Map<String,Object> arguments 参数35 */36
37String exchangeName = "test_fanout";38
39//创建交换机40channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);41
42//创建队列43String queue1Name = "test_fanout_queue1";44String queue2Name = "test_fanout_queue2";45channel.queueDeclare(queue1Name,true,false,false,null);46channel.queueDeclare(queue2Name,true,false,false,null);47
48
49//绑定队列和交换机50/*51 参数:52 1、String queue 队列名称53 2、String exchange 交换机名称54 3、String routingKey 路由键,绑定规则55 如果交换机的类型为fanout,routingKey设置为""56 */57channel.queueBind(queue1Name,exchangeName,"");58channel.queueBind(queue2Name,exchangeName,"");
routingKey的作用
交换机根据消息携带的路由键,来决定该消息递交给哪个队列,交换机根据路由键提供的绑定规则来递交消息后,消费者就可以连接队列获取消息
首先路由键需要用在在交换机和队列创建之后的相互绑定
其次发布数据的时候需要在某个交换机里面填写路由键,然后写上要发送的消息内容
最后路由键就可以找到被绑定的相应的队列来接收消息
消费者代码实现
xxxxxxxxxx471//创建连接工厂2ConnectionFactory factory = new ConnectionFactory();3
4//设置参数5//设置主机ip和端口,默认为localhost:56726factory.setHost("192.168.119.88");7factory.setPort(5672);8//设置虚拟机名9factory.setVirtualHost("/");10//设置用户名和密码11factory.setUsername("root");12factory.setPassword("123456");13
14
15
16//创建连接 Connection17Connection connection = factory.newConnection();18
19//创建channel20Channel channel = connection.createChannel();21
22
23/*24 参数:25 1、queue:队列名称26 2、autoAck:是否自动确认27 3、callback:回调对象28 */29Consumer consumer = new DefaultConsumer(channel){30 /**31 * 回调方法,当收到消息后,会自动执行该方法32 * @param consumerTag 标识33 * @param envelope 获取一些信息(交换机,路由key...)34 * @param properties 配置信息35 * @param body 数据36 * @throws IOException37 */38 39 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {40
41 System.out.println("body:"+new String(body));42 System.out.println("将日志信息打印到控制台...");43
44 }45};46//接收消息,绑定一条列队47channel.basicConsume("test_fanout_queue1",true,consumer);

生产者代码实现
xxxxxxxxxx741//创建连接工厂2ConnectionFactory factory = new ConnectionFactory();3
4//设置参数5//设置主机ip和端口,默认为localhost:56726factory.setHost("192.168.119.88");7factory.setPort(5672);8//设置虚拟机名9factory.setVirtualHost("/");10//设置用户名和密码11factory.setUsername("root");12factory.setPassword("123456");13
14
15
16//创建连接 Connection17Connection connection = factory.newConnection();18
19//创建channel20Channel channel = connection.createChannel();21
22/*23 参数:24 1、String exchange 交换机名称25 2、BuiltinExchangeType type 交换机类型26 DIRECT("direct"):定向27 FANOUT("fanout"):扇形(广播),发送消息到每个与之绑定的队列28 TOPIC("topic"):通配符的方式29 HEADERS("headers"):参数匹配30
31 3、boolean durable 是否持久化32 4、boolean autoDelete 自动删除33 5、boolean internal 内部使用,一般为false34 6、Map<String,Object> arguments 参数35 */36
37String exchangeName = "test_direct";38
39//创建交换机40channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);41
42//创建队列43String queue1Name = "test_direct_queue1";44String queue2Name = "test_direct_queue2";45
46channel.queueDeclare(queue1Name,true,false,false,null);47channel.queueDeclare(queue2Name,true,false,false,null);48
49
50//绑定队列和交换机51/*52 参数:53 1、String queue 队列名称54 2、String exchange 交换机名称55 3、String routingKey 路由键,绑定规则56 */57//队列1的绑定58channel.queueBind(queue1Name,exchangeName,"error");59//队列2的绑定60channel.queueBind(queue2Name,exchangeName,"error");61channel.queueBind(queue2Name,exchangeName,"warning");62channel.queueBind(queue2Name,exchangeName,"info");63
64String body = "日志信息,张三调用了findAll方法...日志级别:info...";65String body2 = "错误信息,数据出现错误...级别:error...";66
67//发送消息info68channel.basicPublish(exchangeName,"info",null,body.getBytes());69channel.basicPublish(exchangeName,"error",null,body2.getBytes());70
71
72//释放资源73channel.close();74connection.close();
消费者代码实现
xxxxxxxxxx481//创建连接工厂2ConnectionFactory factory = new ConnectionFactory();3
4//设置参数5//设置主机ip和端口,默认为localhost:56726factory.setHost("192.168.119.88");7factory.setPort(5672);8//设置虚拟机名9factory.setVirtualHost("/");10//设置用户名和密码11factory.setUsername("root");12factory.setPassword("123456");13
14
15
16//创建连接 Connection17Connection connection = factory.newConnection();18
19//创建channel20Channel channel = connection.createChannel();21
22
23/*24 参数:25 1、queue:队列名称26 2、autoAck:是否自动确认27 3、callback:回调对象28 */29Consumer consumer = new DefaultConsumer(channel){30 /**31 * 回调方法,当收到消息后,会自动执行该方法32 * @param consumerTag 标识33 * @param envelope 获取一些信息(交换机,路由key...)34 * @param properties 配置信息35 * @param body 数据36 * @throws IOException37 */38 39 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {40
41 System.out.println("body:"+new String(body));42 System.out.println("将error信息保存到数据库...");43
44 }45};46
47//接收消息48channel.basicConsume("test_direct_queue1",true,consumer);

通配符:
*号代表恰好匹配1个单词 能匹配topic.hello
#号代表匹配0个或多个单词 能匹配topic.hello.world
例如:routingKey为 系统的名称.日志的级别
需求:将所有error级别的日志存入数据库,所有的order系统的日志存入数据库
生产者
消费者
引入maven坐标
xxxxxxxxxx251<dependencies>2 <dependency>3 <groupId>org.springframework</groupId>4 <artifactId>spring-context</artifactId>5 <version>5.2.1.RELEASE</version>6 </dependency>7
8 <dependency>9 <groupId>org.springframework.amqp</groupId>10 <artifactId>spring-rabbit</artifactId>11 <version>2.2.6.RELEASE</version>12 </dependency>13
14 <dependency>15 <groupId>junit</groupId>16 <artifactId>junit</artifactId>17 <version>4.12</version>18 </dependency>19
20 <dependency>21 <groupId>org.springframework</groupId>22 <artifactId>spring-test</artifactId>23 <version>5.2.1.RELEASE</version>24 </dependency>25</dependencies>
创建配置信息文件
xxxxxxxxxx51rabbitmq.host=192.168.119.882rabbitmq.port=56723rabbitmq.username=root4rabbitmq.password=1234565rabbitmq.virtual-host=/
配置spring核心配置文件(生产者)
xxxxxxxxxx681 2<beans xmlns="http://www.springframework.org/schema/beans"3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"4 xmlns:context="http://www.springframework.org/schema/context"5 xmlns:rabbit="http://www.springframework.org/schema/rabbit"6 xsi:schemaLocation="http://www.springframework.org/schema/beans7 https://www.springframework.org/schema/beans/spring-beans.xsd8 http://www.springframework.org/schema/context9 https://www.springframework.org/schema/context/spring-context.xsd10 http://www.springframework.org/schema/rabbit11 http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">12
13 <!--引入配置信息-->14 <context:property-placeholder location="classpath:rabbitmq.properties"></context:property-placeholder>15
16 <!--定义rabbitmq connectionFactory-->17 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"18 port="${rabbitmq.port}"19 username="${rabbitmq.username}"20 password="${rabbitmq.password}"21 virtual-host="${rabbitmq.virtual-host}"/>22
23 <!--定义管理交换机、队列-->24 <rabbit:admin connection-factory="connectionFactory"/>25
26 <!--定义持久化队列,不存在则自动创建,不绑定到交换机则绑定到默认交换机27 默认交换机类型为direct,名字为"",路由键为队列的名称28 id:bean的名称29 name:queue的名称30 auto-declare:自动创建31 auto-delete:自动删除,最后一个消费者和该队列断开连接后,自动删除队列32 durable:是否持久化true/false33 exclusive:是否独占连接34 -->35 <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>36
37 <!--广播类型队列,所有队列都能收到消息,不存在自动创建-->38 <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>39 <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>40
41 <!--定义广播类型交换机,并绑定上述两个队列-->42 <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">43
44 <rabbit:bindings>45 <rabbit:binding queue="spring_fanout_queue_1"></rabbit:binding>46 <rabbit:binding queue="spring_fanout_queue_2"></rabbit:binding>47 </rabbit:bindings>48
49 </rabbit:fanout-exchange>50
51 <!--通配符类型队列,*匹配一个单词,#匹配多个单词,不存在则自动创建-->52 <rabbit:queue id="spring_topic_queue_1" name="spring_topic_queue_1" auto-declare="true"/>53 <rabbit:queue id="spring_topic_queue_2" name="spring_topic_queue_2" auto-declare="true"/>54 <rabbit:queue id="spring_topic_queue_3" name="spring_topic_queue_3" auto-declare="true"/>55
56 <!--创建Topic类型交换机,如果不存在则自动创建,并且将上面三个队列与该交换机绑定-->57 <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">58 <rabbit:bindings>59 <rabbit:binding pattern="os467.*" queue="spring_topic_queue_1"/>60 <rabbit:binding pattern="os467.#" queue="spring_topic_queue_2"/>61 <rabbit:binding pattern="com.os467.#" queue="spring_topic_queue_3"/>62 </rabbit:bindings>63 </rabbit:topic-exchange>64
65 <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->66 <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"></rabbit:template>67
68</beans>
在单元测试中测试
xxxxxxxxxx261package com.os467.rabbitmq_pro_test;2
3import org.junit.Test;4import org.junit.runner.RunWith;5import org.springframework.amqp.rabbit.core.RabbitTemplate;6import org.springframework.beans.factory.annotation.Autowired;7import org.springframework.test.context.ContextConfiguration;8import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;9
10(SpringJUnit4ClassRunner.class)11(locations = "classpath:spring_rabbitmq_pro.xml")12public class MqProTest {13
14 //注入rabbitmq模板对象15 16 private RabbitTemplate rabbitTemplate;17
18 19 public void testHelloWorld(){20
21 //发送消息22 rabbitTemplate.convertAndSend("spring_queue","testing spring-rabbitmq...");23
24 }25
26}
测试topic类型消息
xxxxxxxxxx612public void testFanoutQueue(){3
4 rabbitTemplate.convertAndSend("spring_topic_exchange","os467.test.msg","this is topic mode!");5
6}
导入maven坐标
xxxxxxxxxx271<dependencies>2 3 <dependency>4 <groupId>junit</groupId>5 <artifactId>junit</artifactId>6 <version>4.12</version>7 </dependency>8
9 <dependency>10 <groupId>org.springframework</groupId>11 <artifactId>spring-test</artifactId>12 <version>5.2.1.RELEASE</version>13 </dependency>14
15 <dependency>16 <groupId>org.springframework</groupId>17 <artifactId>spring-context</artifactId>18 <version>5.2.1.RELEASE</version>19 </dependency>20
21 <dependency>22 <groupId>org.springframework.amqp</groupId>23 <artifactId>spring-rabbit</artifactId>24 <version>2.1.8.RELEASE</version>25 </dependency>26
27</dependencies>
编辑消费者消息监听器
onMessage()方法xxxxxxxxxx201package com.os467.rabbitmq_con;2
3import org.springframework.amqp.core.Message;4import org.springframework.amqp.core.MessageListener;5
6public class SpringQueueListener implements MessageListener {7
8
9 /**10 * 监听器,回调方法11 * @param message12 */13 14 public void onMessage(Message message) {15
16 //打印消息17 System.out.println(new String(message.getBody()));18
19 }20}
配置spring核心配置文件(消费者)
xxxxxxxxxx321 2<beans xmlns="http://www.springframework.org/schema/beans"3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"4 xmlns:context="http://www.springframework.org/schema/context"5 xmlns:rabbit="http://www.springframework.org/schema/rabbit"6 xsi:schemaLocation="http://www.springframework.org/schema/beans7 http://www.springframework.org/schema/beans/spring-beans.xsd8 http://www.springframework.org/schema/context9 https://www.springframework.org/schema/context/spring-context.xsd10 http://www.springframework.org/schema/rabbit11 http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">12 <!--加载配置文件-->13 <context:property-placeholder location="classpath:rabbitmq.properties"/>14
15 <!-- 定义rabbitmq connectionFactory -->16 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"17 port="${rabbitmq.port}"18 username="${rabbitmq.username}"19 password="${rabbitmq.password}"20 virtual-host="${rabbitmq.virtual-host}"/>21
22 <bean id="springQueueListener" class="com.os467.rabbitmq_con.SpringQueueListener"></bean>23
24 <!--配置消息监听器-->25 <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">26
27 <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>28
29 </rabbit:listener-container>30
31
32</beans>
引入坐标
xxxxxxxxxx181<dependencies>2 <dependency>3 <groupId>org.springframework.boot</groupId>4 <artifactId>spring-boot-starter</artifactId>5 </dependency>6
7 <dependency>8 <groupId>org.springframework.boot</groupId>9 <artifactId>spring-boot-starter-test</artifactId>10 </dependency>11
12 <!--springboot整合rabbitmq-->13 <dependency>14 <groupId>org.springframework.boot</groupId>15 <artifactId>spring-boot-starter-amqp</artifactId>16 </dependency>17
18</dependencies>
配置启动类配置文件
xxxxxxxxxx51spring.rabbitmq.host=192.168.119.882spring.rabbitmq.port=56723spring.rabbitmq.virtual-host=/4spring.rabbitmq.username=root5spring.rabbitmq.password=123456
配置核心配置类
1、需要配置一个交换机
2、需要声明一个队列
3、配置交换机和队列绑定关系
xxxxxxxxxx461package com.os467.demo.rabbitmq.config;2
3import com.rabbitmq.client.AMQP;4import org.springframework.amqp.core.*;5import org.springframework.context.annotation.Bean;6import org.springframework.context.annotation.Configuration;7
89public class RabbitMQConfig {10
11 public static final String TOPIC_EXCHANGE_NAME = "boot_topic_exchange";12
13 public static final String TOPIC_QUEUE_NAME = "boot_topic_queue";14
15
16 //1、交换机17 ("bootExchange")18 public Exchange bootExchange(){19
20 //ExchangeBuilder创建交换机的类,durable是否持久化,build()创建交换机21 return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();22
23 }24
25
26 //2、Queue队列27 ("bootQueue")28 public Queue bootQueue(){29
30 //队列声明31 return QueueBuilder.durable(TOPIC_QUEUE_NAME).build();32 }33
34 //3、队列和交换机绑定关系 Binding35 /*36 1、知道哪个队列37 2、知道哪个交换机38 3、routing key39 */40 41 public Binding bindingQueueExchange(Exchange bootExchange,Queue bootQueue){42
43 return BindingBuilder.bind(bootQueue).to(bootExchange).with("os467.#").noargs();44 }45
46}
测试生产者发送消息
xxxxxxxxxx241package com.os467.demo.rabbitmq;2
3import com.os467.demo.rabbitmq.config.RabbitMQConfig;4import org.junit.jupiter.api.Test;5import org.springframework.amqp.rabbit.core.RabbitTemplate;6import org.springframework.beans.factory.annotation.Autowired;7import org.springframework.boot.test.context.SpringBootTest;8
910public class ProducerTest {11
12 //注入RabbitTemplate13 14 private RabbitTemplate rabbitTemplate;15
16 17 public void testSend(){18
19 //测试生产者发送消息20 rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,"os467.testTopic","boot mq testing...");21
22 }23
24}
创建springboot工程导入maven坐标
xxxxxxxxxx41<dependency>2 <groupId>org.springframework.boot</groupId>3 <artifactId>spring-boot-starter-amqp</artifactId>4</dependency>
配置启动类配置文件
xxxxxxxxxx51spring.rabbitmq.host=192.168.119.882spring.rabbitmq.port=56723spring.rabbitmq.virtual-host=/4spring.rabbitmq.username=root5spring.rabbitmq.password=123456
创建静态类
使用@Component将类添加为spring组件
配置一个监听器
使用@RabbitListener注解
指定消费者监听器监听的队列名称
xxxxxxxxxx171package com.os467.demo.rabbitmq_listener;2
3import org.springframework.amqp.core.Message;4import org.springframework.amqp.rabbit.annotation.RabbitListener;5import org.springframework.stereotype.Component;6
78public class RabbitMQListener {9
10 (queues = "boot_topic_queue")11 public void ListenerQueue(Message message){12
13 System.out.println(message);14
15 }16
17}
总结