MQ全称Message Queue(消息队列),是在消息的传输过程中保存新消息的容器,多用于分布式系统之间进行通信
生产者 → 中间件(MQ) → 消费者
分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
发送方称为生产者,接收方称为消费者
优势
应用解耦
异步提速
削峰填谷
劣势
系统可用性降低
系统复杂度提高
一致性问题
生产者不需要从消费者处获得反馈
引入消息列队之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成功动作做完了继续
容许短暂的不一致性
确实用了有效果
即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本
目前业界有很多的MQ产品,例如RabbitMQ,RocketMQ,ActiveMQ,Kafka,ZeroMQ,MetaMQ等,也有直接使用Redis充当消息队列的案例,这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ产品特征,综合考虑
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
AMQP,即Advanced Message Queuing Protocol (高级消息队列协议),是一个网络协议,是应用层的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制,2006年,AMQP规范发布,类比Http
RabbitMQ基础架构
Broker
接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建exchange / queue等
Connection
publisher / consumer 和broker之间的TCP连接
Channel
如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connectio的开销将是巨大的,效率也较低,Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker 识别channel,所以 channel之间是完全隔离的,Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的
Exchange
message到达 broker的第一站,根据分发规则,匹配查询表中的RoutingKey,分发消息到queue中去,常用的交换机类型有: 路由direct (point-to-point), 通配符topic(publish-subscribe) and 订阅fanout (multicast)
Queue
消息最终被送到这里等待consumer取走
Binding
exchange和queue之间的虚拟连接,binding 中可以包含 routing key,Binding信息被保存到exchange 中的查询表中,用于message 的分发依据
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API
JMS是JavaEE规范中的一种,类比JDBC
很多消息中间件都实现了JMS规范,例如:ActiveMQ
RabbitMQ官方没有提供JMS的实现包,但是开源社区有
rabbitMQ官网:http://www.rabbitmq.com/
使用docker拉取镜像
1docker pull rabbitmq:3-management
运行rabbitmq
xxxxxxxxxx
91docker run \
2 -e RABBITMQ_DEFAULT_USER=root \
3 -e RABBITMQ_DEFAULT_PASS=123456 \
4 --name mq \
5 --hostname mq1 \
6 -p 15672:15672 \
7 -p 5672:5672 \
8 -d \
9 rabbitmq:3-management
本机访问控制台 虚拟机ip:15672
引入rabbitmq依赖坐标
xxxxxxxxxx
61<!--rabbitmq java 客户端-->
2<dependency>
3 <groupId>com.rabbitmq</groupId>
4 <artifactId>amqp-client</artifactId>
5 <version>5.4.3</version>
6</dependency>
P:生产者,也就是要发送消息的程序
Q:消息队列,类似一个邮箱,可以缓存消息,生产者向其中投递消息,消费者从其中取出消息
C:消费者:消息的接收者,会一直等待消息到来
1、创建连接工厂
xxxxxxxxxx
21//创建连接工厂
2ConnectionFactory factory = new ConnectionFactory();
2、设置参数
xxxxxxxxxx
91//设置参数
2//设置主机ip和端口,默认为localhost:5672
3factory.setHost("192.168.119.88");
4factory.setPort(5672);
5//设置虚拟机名
6factory.setVirtualHost("/");
7//设置用户名和密码
8factory.setUsername("root");
9factory.setPassword("123456");
3、创建连接 Connection
xxxxxxxxxx
21//创建连接 Connection
2Connection connection = factory.newConnection();
4、创建 Channel
xxxxxxxxxx
21//创建channel
2Channel channel = connection.createChannel();
5、创建列队Queue
用于消息队列
参数:
xxxxxxxxxx
101/*
2 参数:
3 1、String queue:队列名称
4 2、boolean durable:是否持久化,当mq重启之后,还在
5 3、boolean exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列
6 4、boolean autoDelete:是否自动删除,当没有Consumer时,自动删除掉
7 5、Map<String, Object> arguments:参数消息
8 */
9//如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建
10channel.queueDeclare("hello world",true,false,false,null);
6、发送消息
用于生产者发送消息给交换机,并且对消息进行一些设置
参数:
x1/*
2 参数:
3 1、String exchange:交换机的名称,简单模式下交换机会使用默认的
4 2、String routingKey:路由名称,没有指定交换机时可以是队列名称
5 3、BasicProperties props:配置信息
6 4、byte[] body:发送消息数据
7 */
8//发送消息
9
10String body = "Hello rabbitMQ!";
11
12channel.basicPublish("","hello world",null,body.getBytes());
7、释放资源
xxxxxxxxxx
31//释放资源
2channel.close();
3connection.close();
1、创建连接工厂,设置参数
xxxxxxxxxx
121//创建连接工厂
2ConnectionFactory factory = new ConnectionFactory();
3
4//设置参数
5//设置主机ip和端口,默认为localhost:5672
6factory.setHost("192.168.119.88");
7factory.setPort(5672);
8//设置虚拟机名
9factory.setVirtualHost("/");
10//设置用户名和密码
11factory.setUsername("root");
12factory.setPassword("123456");
2、创建Connection和Channel
xxxxxxxxxx
91//创建连接 Connection
2Connection connection = factory.newConnection();
3
4//创建channel
5Channel channel = connection.createChannel();
6
7//创建列队
8channel.queueDeclare("hello world",true,false,false,null);
9
3、接收消息
xxxxxxxxxx
291/*
2 参数:
3 1、queue:队列名称
4 2、autoAck:是否自动确认
5 3、callback:回调对象
6 */
7Consumer consumer = new DefaultConsumer(channel){
8 /**
9 * 回调方法,当收到消息后,会自动执行该方法
10 * @param consumerTag 标识
11 * @param envelope 获取一些信息(交换机,路由key...)
12 * @param properties 配置信息
13 * @param body 数据
14 * @throws IOException
15 */
16
17 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
18
19 System.out.println("consumerTag:"+consumerTag);
20 System.out.println("Exchange:"+envelope.getExchange());
21 System.out.println("RoutingKey:"+envelope.getRoutingKey());
22 System.out.println("properties:"+properties);
23 System.out.println("body:"+new String(body));
24
25 }
26};
27
28//接收消息
29channel.basicConsume("hello world",true,consumer);
Work queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息
应用场景: 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
例如:短信服务部署多个,只需要有一个节点成功发送即可
默认采用轮询方式分发消息给消费者
生产者发送10次消息
xxxxxxxxxx
271/*
2 参数:
3 1、queue:队列名称
4 2、durable:是否持久化,当mq重启之后,还在
5 3、exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列
6 4、autoDelete:是否自动删除,当没有Consumer时,自动删除掉
7 5、arguments:参数消息
8 */
9//如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建
10channel.queueDeclare("work_queues",true,false,false,null);
11
12/*
13 参数:
14 1、exchange:交换机的名称,简单模式下交换机会使用默认的
15 2、routingKey:路由名称
16 3、props:配置信息
17 4、body:发送消息数据
18 */
19//发送消息
20for (int i = 0; i < 10; i++) {
21
22 String body = i+":Hello rabbitMQ!";
23
24 channel.basicPublish("","work_queues",null,body.getBytes());
25
26
27}
消费者监听消息列队
xxxxxxxxxx
381 /*
2 参数:
3 1、queue:队列名称
4 2、durable:是否持久化,当mq重启之后,还在
5 3、exclusive:是否独占,只能有一个消费者监听这队列,当Connection关闭时,是否删除队列
6 4、autoDelete:是否自动删除,当没有Consumer时,自动删除掉
7 5、arguments:参数消息
8 */
9 //如果没有一个名字叫hello world的队列将会创建该队列,如果有则不会创建
10 channel.queueDeclare("work_queues",true,false,false,null);
11
12 /*
13 参数:
14 1、queue:队列名称
15 2、autoAck:是否自动确认
16 3、callback:回调对象
17 */
18 Consumer consumer = new DefaultConsumer(channel){
19 /**
20 * 回调方法,当收到消息后,会自动执行该方法
21 * @param consumerTag 标识
22 * @param envelope 获取一些信息(交换机,路由key...)
23 * @param properties 配置信息
24 * @param body 数据
25 * @throws IOException
26 */
27
28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29
30 System.out.println("body:"+new String(body));
31
32 }
33 };
34
35 //接收消息
36 channel.basicConsume("work_queues",true,consumer);
37
38}
在订阅模型中,多了一个Exchange角色,而且过程略有变化:
Exchange有常见以下3种类型:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失
用于声明一个交换机
参数:
String exchange 交换机名称
BuiltinExchangeType type 交换机类型
boolean durable 是否持久化
boolean autoDelete 自动删除
boolean internal 内部使用,一般为false
Map<String,Object> arguments 参数
生产者代码实现
xxxxxxxxxx
581//创建连接工厂
2ConnectionFactory factory = new ConnectionFactory();
3
4//设置参数
5//设置主机ip和端口,默认为localhost:5672
6factory.setHost("192.168.119.88");
7factory.setPort(5672);
8//设置虚拟机名
9factory.setVirtualHost("/");
10//设置用户名和密码
11factory.setUsername("root");
12factory.setPassword("123456");
13
14
15
16//创建连接 Connection
17Connection connection = factory.newConnection();
18
19//创建channel
20Channel channel = connection.createChannel();
21
22/*
23 参数:
24 1、String exchange 交换机名称
25 2、BuiltinExchangeType type 交换机类型
26 DIRECT("direct"):定向
27 FANOUT("fanout"):扇形(广播),发送消息到每个与之绑定的队列
28 TOPIC("topic"):通配符的方式
29 HEADERS("headers"):参数匹配
30
31 3、boolean durable 是否持久化
32 4、boolean autoDelete 自动删除
33 5、boolean internal 内部使用,一般为false
34 6、Map<String,Object> arguments 参数
35 */
36
37String exchangeName = "test_fanout";
38
39//创建交换机
40channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
41
42//创建队列
43String queue1Name = "test_fanout_queue1";
44String queue2Name = "test_fanout_queue2";
45channel.queueDeclare(queue1Name,true,false,false,null);
46channel.queueDeclare(queue2Name,true,false,false,null);
47
48
49//绑定队列和交换机
50/*
51 参数:
52 1、String queue 队列名称
53 2、String exchange 交换机名称
54 3、String routingKey 路由键,绑定规则
55 如果交换机的类型为fanout,routingKey设置为""
56 */
57channel.queueBind(queue1Name,exchangeName,"");
58channel.queueBind(queue2Name,exchangeName,"");
routingKey的作用
交换机根据消息携带的路由键,来决定该消息递交给哪个队列,交换机根据路由键提供的绑定规则来递交消息后,消费者就可以连接队列获取消息
首先路由键需要用在在交换机和队列创建之后的相互绑定
其次发布数据的时候需要在某个交换机里面填写路由键,然后写上要发送的消息内容
最后路由键就可以找到被绑定的相应的队列来接收消息
消费者代码实现
xxxxxxxxxx
471//创建连接工厂
2ConnectionFactory factory = new ConnectionFactory();
3
4//设置参数
5//设置主机ip和端口,默认为localhost:5672
6factory.setHost("192.168.119.88");
7factory.setPort(5672);
8//设置虚拟机名
9factory.setVirtualHost("/");
10//设置用户名和密码
11factory.setUsername("root");
12factory.setPassword("123456");
13
14
15
16//创建连接 Connection
17Connection connection = factory.newConnection();
18
19//创建channel
20Channel channel = connection.createChannel();
21
22
23/*
24 参数:
25 1、queue:队列名称
26 2、autoAck:是否自动确认
27 3、callback:回调对象
28 */
29Consumer consumer = new DefaultConsumer(channel){
30 /**
31 * 回调方法,当收到消息后,会自动执行该方法
32 * @param consumerTag 标识
33 * @param envelope 获取一些信息(交换机,路由key...)
34 * @param properties 配置信息
35 * @param body 数据
36 * @throws IOException
37 */
38
39 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
40
41 System.out.println("body:"+new String(body));
42 System.out.println("将日志信息打印到控制台...");
43
44 }
45};
46//接收消息,绑定一条列队
47channel.basicConsume("test_fanout_queue1",true,consumer);
生产者代码实现
xxxxxxxxxx
741//创建连接工厂
2ConnectionFactory factory = new ConnectionFactory();
3
4//设置参数
5//设置主机ip和端口,默认为localhost:5672
6factory.setHost("192.168.119.88");
7factory.setPort(5672);
8//设置虚拟机名
9factory.setVirtualHost("/");
10//设置用户名和密码
11factory.setUsername("root");
12factory.setPassword("123456");
13
14
15
16//创建连接 Connection
17Connection connection = factory.newConnection();
18
19//创建channel
20Channel channel = connection.createChannel();
21
22/*
23 参数:
24 1、String exchange 交换机名称
25 2、BuiltinExchangeType type 交换机类型
26 DIRECT("direct"):定向
27 FANOUT("fanout"):扇形(广播),发送消息到每个与之绑定的队列
28 TOPIC("topic"):通配符的方式
29 HEADERS("headers"):参数匹配
30
31 3、boolean durable 是否持久化
32 4、boolean autoDelete 自动删除
33 5、boolean internal 内部使用,一般为false
34 6、Map<String,Object> arguments 参数
35 */
36
37String exchangeName = "test_direct";
38
39//创建交换机
40channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
41
42//创建队列
43String queue1Name = "test_direct_queue1";
44String queue2Name = "test_direct_queue2";
45
46channel.queueDeclare(queue1Name,true,false,false,null);
47channel.queueDeclare(queue2Name,true,false,false,null);
48
49
50//绑定队列和交换机
51/*
52 参数:
53 1、String queue 队列名称
54 2、String exchange 交换机名称
55 3、String routingKey 路由键,绑定规则
56 */
57//队列1的绑定
58channel.queueBind(queue1Name,exchangeName,"error");
59//队列2的绑定
60channel.queueBind(queue2Name,exchangeName,"error");
61channel.queueBind(queue2Name,exchangeName,"warning");
62channel.queueBind(queue2Name,exchangeName,"info");
63
64String body = "日志信息,张三调用了findAll方法...日志级别:info...";
65String body2 = "错误信息,数据出现错误...级别:error...";
66
67//发送消息info
68channel.basicPublish(exchangeName,"info",null,body.getBytes());
69channel.basicPublish(exchangeName,"error",null,body2.getBytes());
70
71
72//释放资源
73channel.close();
74connection.close();
消费者代码实现
xxxxxxxxxx
481//创建连接工厂
2ConnectionFactory factory = new ConnectionFactory();
3
4//设置参数
5//设置主机ip和端口,默认为localhost:5672
6factory.setHost("192.168.119.88");
7factory.setPort(5672);
8//设置虚拟机名
9factory.setVirtualHost("/");
10//设置用户名和密码
11factory.setUsername("root");
12factory.setPassword("123456");
13
14
15
16//创建连接 Connection
17Connection connection = factory.newConnection();
18
19//创建channel
20Channel channel = connection.createChannel();
21
22
23/*
24 参数:
25 1、queue:队列名称
26 2、autoAck:是否自动确认
27 3、callback:回调对象
28 */
29Consumer consumer = new DefaultConsumer(channel){
30 /**
31 * 回调方法,当收到消息后,会自动执行该方法
32 * @param consumerTag 标识
33 * @param envelope 获取一些信息(交换机,路由key...)
34 * @param properties 配置信息
35 * @param body 数据
36 * @throws IOException
37 */
38
39 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
40
41 System.out.println("body:"+new String(body));
42 System.out.println("将error信息保存到数据库...");
43
44 }
45};
46
47//接收消息
48channel.basicConsume("test_direct_queue1",true,consumer);
通配符:
*号代表恰好匹配1个单词 能匹配topic.hello
#号代表匹配0个或多个单词 能匹配topic.hello.world
例如:routingKey为 系统的名称.日志的级别
需求:将所有error级别的日志存入数据库,所有的order系统的日志存入数据库
生产者
消费者
引入maven坐标
xxxxxxxxxx
251<dependencies>
2 <dependency>
3 <groupId>org.springframework</groupId>
4 <artifactId>spring-context</artifactId>
5 <version>5.2.1.RELEASE</version>
6 </dependency>
7
8 <dependency>
9 <groupId>org.springframework.amqp</groupId>
10 <artifactId>spring-rabbit</artifactId>
11 <version>2.2.6.RELEASE</version>
12 </dependency>
13
14 <dependency>
15 <groupId>junit</groupId>
16 <artifactId>junit</artifactId>
17 <version>4.12</version>
18 </dependency>
19
20 <dependency>
21 <groupId>org.springframework</groupId>
22 <artifactId>spring-test</artifactId>
23 <version>5.2.1.RELEASE</version>
24 </dependency>
25</dependencies>
创建配置信息文件
xxxxxxxxxx
51rabbitmq.host=192.168.119.88
2rabbitmq.port=5672
3rabbitmq.username=root
4rabbitmq.password=123456
5rabbitmq.virtual-host=/
配置spring核心配置文件(生产者)
xxxxxxxxxx
681
2<beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:context="http://www.springframework.org/schema/context"
5 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
6 xsi:schemaLocation="http://www.springframework.org/schema/beans
7 https://www.springframework.org/schema/beans/spring-beans.xsd
8 http://www.springframework.org/schema/context
9 https://www.springframework.org/schema/context/spring-context.xsd
10 http://www.springframework.org/schema/rabbit
11 http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
12
13 <!--引入配置信息-->
14 <context:property-placeholder location="classpath:rabbitmq.properties"></context:property-placeholder>
15
16 <!--定义rabbitmq connectionFactory-->
17 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
18 port="${rabbitmq.port}"
19 username="${rabbitmq.username}"
20 password="${rabbitmq.password}"
21 virtual-host="${rabbitmq.virtual-host}"/>
22
23 <!--定义管理交换机、队列-->
24 <rabbit:admin connection-factory="connectionFactory"/>
25
26 <!--定义持久化队列,不存在则自动创建,不绑定到交换机则绑定到默认交换机
27 默认交换机类型为direct,名字为"",路由键为队列的名称
28 id:bean的名称
29 name:queue的名称
30 auto-declare:自动创建
31 auto-delete:自动删除,最后一个消费者和该队列断开连接后,自动删除队列
32 durable:是否持久化true/false
33 exclusive:是否独占连接
34 -->
35 <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
36
37 <!--广播类型队列,所有队列都能收到消息,不存在自动创建-->
38 <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
39 <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
40
41 <!--定义广播类型交换机,并绑定上述两个队列-->
42 <rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
43
44 <rabbit:bindings>
45 <rabbit:binding queue="spring_fanout_queue_1"></rabbit:binding>
46 <rabbit:binding queue="spring_fanout_queue_2"></rabbit:binding>
47 </rabbit:bindings>
48
49 </rabbit:fanout-exchange>
50
51 <!--通配符类型队列,*匹配一个单词,#匹配多个单词,不存在则自动创建-->
52 <rabbit:queue id="spring_topic_queue_1" name="spring_topic_queue_1" auto-declare="true"/>
53 <rabbit:queue id="spring_topic_queue_2" name="spring_topic_queue_2" auto-declare="true"/>
54 <rabbit:queue id="spring_topic_queue_3" name="spring_topic_queue_3" auto-declare="true"/>
55
56 <!--创建Topic类型交换机,如果不存在则自动创建,并且将上面三个队列与该交换机绑定-->
57 <rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
58 <rabbit:bindings>
59 <rabbit:binding pattern="os467.*" queue="spring_topic_queue_1"/>
60 <rabbit:binding pattern="os467.#" queue="spring_topic_queue_2"/>
61 <rabbit:binding pattern="com.os467.#" queue="spring_topic_queue_3"/>
62 </rabbit:bindings>
63 </rabbit:topic-exchange>
64
65 <!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
66 <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"></rabbit:template>
67
68</beans>
在单元测试中测试
xxxxxxxxxx
261package com.os467.rabbitmq_pro_test;
2
3import org.junit.Test;
4import org.junit.runner.RunWith;
5import org.springframework.amqp.rabbit.core.RabbitTemplate;
6import org.springframework.beans.factory.annotation.Autowired;
7import org.springframework.test.context.ContextConfiguration;
8import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
9
10SpringJUnit4ClassRunner.class) (
11locations = "classpath:spring_rabbitmq_pro.xml") (
12public class MqProTest {
13
14 //注入rabbitmq模板对象
15
16 private RabbitTemplate rabbitTemplate;
17
18
19 public void testHelloWorld(){
20
21 //发送消息
22 rabbitTemplate.convertAndSend("spring_queue","testing spring-rabbitmq...");
23
24 }
25
26}
测试topic类型消息
xxxxxxxxxx
61
2public void testFanoutQueue(){
3
4 rabbitTemplate.convertAndSend("spring_topic_exchange","os467.test.msg","this is topic mode!");
5
6}
导入maven坐标
xxxxxxxxxx
271<dependencies>
2
3 <dependency>
4 <groupId>junit</groupId>
5 <artifactId>junit</artifactId>
6 <version>4.12</version>
7 </dependency>
8
9 <dependency>
10 <groupId>org.springframework</groupId>
11 <artifactId>spring-test</artifactId>
12 <version>5.2.1.RELEASE</version>
13 </dependency>
14
15 <dependency>
16 <groupId>org.springframework</groupId>
17 <artifactId>spring-context</artifactId>
18 <version>5.2.1.RELEASE</version>
19 </dependency>
20
21 <dependency>
22 <groupId>org.springframework.amqp</groupId>
23 <artifactId>spring-rabbit</artifactId>
24 <version>2.1.8.RELEASE</version>
25 </dependency>
26
27</dependencies>
编辑消费者消息监听器
onMessage()
方法xxxxxxxxxx
201package com.os467.rabbitmq_con;
2
3import org.springframework.amqp.core.Message;
4import org.springframework.amqp.core.MessageListener;
5
6public class SpringQueueListener implements MessageListener {
7
8
9 /**
10 * 监听器,回调方法
11 * @param message
12 */
13
14 public void onMessage(Message message) {
15
16 //打印消息
17 System.out.println(new String(message.getBody()));
18
19 }
20}
配置spring核心配置文件(消费者)
xxxxxxxxxx
321
2<beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:context="http://www.springframework.org/schema/context"
5 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
6 xsi:schemaLocation="http://www.springframework.org/schema/beans
7 http://www.springframework.org/schema/beans/spring-beans.xsd
8 http://www.springframework.org/schema/context
9 https://www.springframework.org/schema/context/spring-context.xsd
10 http://www.springframework.org/schema/rabbit
11 http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
12 <!--加载配置文件-->
13 <context:property-placeholder location="classpath:rabbitmq.properties"/>
14
15 <!-- 定义rabbitmq connectionFactory -->
16 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
17 port="${rabbitmq.port}"
18 username="${rabbitmq.username}"
19 password="${rabbitmq.password}"
20 virtual-host="${rabbitmq.virtual-host}"/>
21
22 <bean id="springQueueListener" class="com.os467.rabbitmq_con.SpringQueueListener"></bean>
23
24 <!--配置消息监听器-->
25 <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
26
27 <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
28
29 </rabbit:listener-container>
30
31
32</beans>
引入坐标
xxxxxxxxxx
181<dependencies>
2 <dependency>
3 <groupId>org.springframework.boot</groupId>
4 <artifactId>spring-boot-starter</artifactId>
5 </dependency>
6
7 <dependency>
8 <groupId>org.springframework.boot</groupId>
9 <artifactId>spring-boot-starter-test</artifactId>
10 </dependency>
11
12 <!--springboot整合rabbitmq-->
13 <dependency>
14 <groupId>org.springframework.boot</groupId>
15 <artifactId>spring-boot-starter-amqp</artifactId>
16 </dependency>
17
18</dependencies>
配置启动类配置文件
xxxxxxxxxx
51spring.rabbitmq.host=192.168.119.88
2spring.rabbitmq.port=5672
3spring.rabbitmq.virtual-host=/
4spring.rabbitmq.username=root
5spring.rabbitmq.password=123456
配置核心配置类
1、需要配置一个交换机
2、需要声明一个队列
3、配置交换机和队列绑定关系
xxxxxxxxxx
461package com.os467.demo.rabbitmq.config;
2
3import com.rabbitmq.client.AMQP;
4import org.springframework.amqp.core.*;
5import org.springframework.context.annotation.Bean;
6import org.springframework.context.annotation.Configuration;
7
8
9public class RabbitMQConfig {
10
11 public static final String TOPIC_EXCHANGE_NAME = "boot_topic_exchange";
12
13 public static final String TOPIC_QUEUE_NAME = "boot_topic_queue";
14
15
16 //1、交换机
17 "bootExchange") (
18 public Exchange bootExchange(){
19
20 //ExchangeBuilder创建交换机的类,durable是否持久化,build()创建交换机
21 return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).durable(true).build();
22
23 }
24
25
26 //2、Queue队列
27 "bootQueue") (
28 public Queue bootQueue(){
29
30 //队列声明
31 return QueueBuilder.durable(TOPIC_QUEUE_NAME).build();
32 }
33
34 //3、队列和交换机绑定关系 Binding
35 /*
36 1、知道哪个队列
37 2、知道哪个交换机
38 3、routing key
39 */
40
41 public Binding bindingQueueExchange(Exchange bootExchange,Queue bootQueue){
42
43 return BindingBuilder.bind(bootQueue).to(bootExchange).with("os467.#").noargs();
44 }
45
46}
测试生产者发送消息
xxxxxxxxxx
241package com.os467.demo.rabbitmq;
2
3import com.os467.demo.rabbitmq.config.RabbitMQConfig;
4import org.junit.jupiter.api.Test;
5import org.springframework.amqp.rabbit.core.RabbitTemplate;
6import org.springframework.beans.factory.annotation.Autowired;
7import org.springframework.boot.test.context.SpringBootTest;
8
9
10public class ProducerTest {
11
12 //注入RabbitTemplate
13
14 private RabbitTemplate rabbitTemplate;
15
16
17 public void testSend(){
18
19 //测试生产者发送消息
20 rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_NAME,"os467.testTopic","boot mq testing...");
21
22 }
23
24}
创建springboot工程导入maven坐标
xxxxxxxxxx
41<dependency>
2 <groupId>org.springframework.boot</groupId>
3 <artifactId>spring-boot-starter-amqp</artifactId>
4</dependency>
配置启动类配置文件
xxxxxxxxxx
51spring.rabbitmq.host=192.168.119.88
2spring.rabbitmq.port=5672
3spring.rabbitmq.virtual-host=/
4spring.rabbitmq.username=root
5spring.rabbitmq.password=123456
创建静态类
使用@Component将类添加为spring组件
配置一个监听器
使用@RabbitListener注解
指定消费者监听器监听的队列名称
xxxxxxxxxx
171package com.os467.demo.rabbitmq_listener;
2
3import org.springframework.amqp.core.Message;
4import org.springframework.amqp.rabbit.annotation.RabbitListener;
5import org.springframework.stereotype.Component;
6
7
8public class RabbitMQListener {
9
10 queues = "boot_topic_queue") (
11 public void ListenerQueue(Message message){
12
13 System.out.println(message);
14
15 }
16
17}
总结