RocketMQ
RocketMQ是阿里研发的消息队列中间件,后捐给apache维护。
MQ: 起到微服务通信之间的解耦
Docker安装RocketMQ
docker pull apache/rocketmq
创建nameserver容器
docker run -d \
--restart=always \
--name rmqnamesrv \
--privileged=true \
-p 9876:9876 \
-v /usr/local/rocketmq/data/namesrv/logs:/root/logs \
-v /usr/local/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
apache/rocketmq \
sh mqnamesrv
-v后面的两个挂载目录可以自己设置
创建broker结点
mkdir -p /usr/local/rocketmq/data/broker/logs /usr/local/rocketmq/data/broker/store /usr/local/rocketmq/conf
编辑配置文件
vi /usr/local/rocketmq/conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 127.0.0.1
diskMaxUsedSpaceRatio=95
运行broker
docker run -d \
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
--privileged=true \
-p 10911:10911 \
-p 10912:10912 \
-p 10909:10909 \
-v /usr/local/rocketmq/data/broker/logs:/root/logs \
-v /usr/local/rocketmq/data/broker/store:/root/store \
-v /usr/local/rocketmq/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-e "MAX_POSSIBLE_HEAP=200000000" \
apache/rocketmq \
sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf
创建rockermq-console服务
docker pull styletang/rocketmq-console-ng
docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.17.0.2:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 8081:8080 \
--ulimit nofile=1024 \
styletang/rocketmq-console-ng
概念介绍
RocketMQ分为四个部分
- NameServer
NameServer是RocketMQ的注册中心,由于Broker服务的注册和发现,类似zookeeper
NameServer之间不会相互通信
- Broker
Broker主要负责消息的存储、投递和查询及服务器高可用保证
- Producer
消息的生产者
- Consumer
消息的消费者
核心概念
Topic
主题,每个消息都只会属于一个主题,消息和主题是一对一关系
生产者生产各种不同消息,每个消息会自动归属于对应主题,消费者的消费是针对于主题的
Broker
Broker是RocketMQ的核心,大部分工作都在Broker中完成,包括接收请求,处理消费,消费持久,消息高可用,以及服务端过滤等都在这里完成
Broker既是物理上的概念(电脑主机),也是逻辑上的概念
多个Broker用套接字区分,多个逻辑上的Broker通过BrokerName区分
NameServer
nameserver类似微服务框架中的注册中心
它提供了路由管理,服务注册,服务发现
maven坐标
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
生产者 Producer
GroupName
和现实世界中一样,RocketMQ中也有组的概念,代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组
每一个生产者组只能有一个实例存在,考虑到提供的生产者在发送消息方面足够强大,每个生产者组和过程只允许一个实例
消费组是一个很大的概念,实现了负载平衡和容错性
producer相关方法
setNamesrvAddr
设置nameserver主机套接字start
初始化生产者send
发送消息到消息队列shutdown
关闭连接
Producer发送消息方法
public class MessageProducer {
public void test(){
//创建消息发送对象,生产者组
DefaultMQProducer producer = new DefaultMQProducer("demo1");
//设置nameserver注册中心地址
producer.setNamesrvAddr("192.168.119.88:9876");
//启动发送服务,初始化生产者
try {
producer.start();
//构建消息,指定topic和body
Message msg = new Message("topic1", "Hello World!".getBytes());
//发送消息
SendResult result = producer.send(msg, 10000);
System.out.println("sendResult = "+result);
//关闭连接
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
}
结果
sendResult = SendResult [sendStatus=SEND_OK, msgId=0A8A3F3A469418B4AAC27ADC1D280000, offsetMsgId=C0A8775800002A9F000000000000060C, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=0], queueOffset=4]
消费者 Consumer
两种接收模式 被动 / 主动
被动: Push 模型:当 Producer 发出的消息到达后,服务端马上将这条消息投递给 Consumer (适合实时性强场景)
主动: Pull 模型:当服务端收到这条消息后什么也不做,只是等着 Consumer 主动到自己这里来读,即 Consumer 这里有一个“拉取”的动作 (适合高并发场景)
RocketMQ的消费者有两个实现分别为DefaultMQPushConsumer和DefaultMQPullConsumer,它们分别为pull模式和push模式。其中pull模式为消费者主动发送请求,每隔一段时间去消息服务端拉取消息,push模式是采取长轮询的机制,消费者轮询方式主动发送请求到服务端Broker,Broker如果检测到有新的消息,则立即返回,否则暂时不返回任何消息,将请求挂起缓存到本地,Broker有一个线程检测挂起请求,等到有新消息时,对请求进项响应。
两种消费模式 集群/广播
RocketMQ的策略有两种,其中一种是集群模式(平均分配),另外一种是广播模式(即:每个消费者都接收全部消息)
集群模式 consumer.setMessageModel(MessageModel.CLUSTERING)
- 该模式下,一条消息只要被关注该topic和tag的集群只消费一次就行(如果有多个集群(每个集群(group)里有多台机器)都关注了这个topic和tag,则每个集群都挑随机一个机器消费一次)。
消息失败会重投,但不保证重投到同一机器上
消费进度由broker维护
广播模式 consumer.setMessageModel(MessageModel.BROADCASTING)
- 在广播消费模式下,MQ 会将每条消息推送给集群内所有订阅该topic和tag的客户端,保证消息至少被每台机器消费一次(若有多个集群都关注这个topic和tag,则每个集群的每个机器都消费一次)
消息发送失败不会重投
消费进度由consumer维护,broker并不关心一条消息有没有消费成功
//设置消费者的消费模式:也是默认的模式负载均衡
consumer.setMessageModel(MessageModel.CLUSTERING);
//设置消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer相关方法
setNamesrvAddr
设置nameserver主机套接字地址subscribe
订阅对应主题消息的方法registerMessageListener
注册消息监听方法start
开启消费者接收消息
public class MessageConsumer {
public static void main(String[] args) {
//创建消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo1");
//设置nameserver地址
consumer.setNamesrvAddr("192.168.119.88:9876");
try {
//设置接收消息对应的topic,对应的sub标签为任意*,之前producer没有指定tag。如果producer发送的消息指定了tag,那么也必须指定相应的tag
consumer.subscribe("topic1", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//遍历接收到的消息
for (MessageExt msg : msgs) {
System.out.println("msg = " + msg);
System.out.println("消息为:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者接收消息
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
结果
msg = MessageExt [queueId=0, storeSize=173, queueOffset=4, sysFlag=0, bornTimestamp=1713962045736, bornHost=/192.168.119.1:3202, storeTimestamp=1713962045662, storeHost=/192.168.119.88:10911, msgId=C0A8775800002A9F000000000000060C, commitLogOffset=1548, bodyCRC=472456355, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic1', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=5, CONSUME_START_TIME=1713962045745, UNIQ_KEY=0A8A3F3A469418B4AAC27ADC1D280000, CLUSTER=DefaultCluster}, body=[72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33], transactionId='null'}]
消息为:Hello World!
消息类型
同步消息
即时性强,重要的消息,必须有回执消息,例如短信,通知(转账成功)
异步消息
即时性弱,但需要有回执的消息,例如订单中的某些信息
单向消息
不需要有回执的消息,如日志类消息
public class Producer {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("demo1");
producer.setNamesrvAddr("192.168.119.88:9876");
try {
producer.start();
//同步消息
for (int i = 1; i <= 10; i++) {
//构建消息,指定topic和body
Message msg = new Message("topic1", ("同步消息: hello" + i).getBytes());
//发送消息
SendResult result = producer.send(msg,10000);
System.out.println("sendResult = "+result);
}
//异步消息
for (int i = 1; i <= 10; i++) {
//构建消息,指定topic和body
Message msg = new Message("topic1", ("异步消息: hello" + i).getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("sendResult = "+sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("e = "+ e);
}
},10000);
}
//单向消息
Message msg = new Message("topic1", ("单向消息,hello".getBytes()));
producer.sendOneway(msg);
//休眠10秒,确保异步消息返回后能输出
TimeUnit.SECONDS.sleep(10);
//关闭连接
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
}
特殊消息
延时消息
RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推
如何配置:
在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("demo1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.119.88:9876");
//启动发送服务
producer.start();
for (int i = 1; i <= 10; i++) {
//构建消息,指定topic和body
Message msg = new Message("topic1", ("延时消息:" + i).getBytes());
//设置延迟消息等级
msg.setDelayTimeLevel(3);
//发送消息
SendResult sendResult = producer.send(msg,10000);
System.out.println("sendResult = " + sendResult);
}
//关闭连接
producer.shutdown();
}
}
批量消息
传输参数为Message队列
- 消息内容总长度不超过4M
- 消息内容总长度包含如下:
- topic(字符串字节数)
- body (字节数组长度)
- 消息追加的属性(key与value对应字符串字节数)
- 日志(固定20字节)
批量消息将一次发送多条消息,但是有发送数据总长度限制
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("demo1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.119.88:9876");
//启动发送服务
producer.start();
List<Message> messageList = new ArrayList<Message>();
//构建消息,指定topic和body
Message msg1 = new Message("topic1", ("批量消息:" + 1).getBytes());
Message msg2 = new Message("topic1", ("批量消息:" + 2).getBytes());
Message msg3 = new Message("topic1", ("批量消息:" + 3).getBytes());
Message msg4 = new Message("topic1", ("批量消息:" + 4).getBytes());
messageList.add(msg1);
messageList.add(msg2);
messageList.add(msg3);
messageList.add(msg4);
//发送消息
SendResult sendResult = producer.send(messageList,10000);
System.out.println("sendResult = " + sendResult);
//关闭连接
producer.shutdown();
}
}
消息过滤
Tag 分类过滤
构建消息,指定topic和body、tag
- Message msg = new Message(“topic1”, “tag1”, “消息过滤tag1消息”.getBytes());
- Message msg2 = new Message(“topic1”, “tag2”, “消息过滤tag2消息”.getBytes());
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("demo1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.119.88:9876");
//启动发送服务
producer.start();
//构建消息,指定topic和body、tag
Message msg = new Message("topic1", "tag1", "消息过滤tag1消息".getBytes());
Message msg2 = new Message("topic1", "tag2", "消息过滤tag2消息".getBytes());
//发送消息
SendResult sendResult = producer.send(msg,10000);
SendResult sendResult2 = producer.send(msg2,10000);
System.out.println("sendResult = " + sendResult);
System.out.println("sendResult2 = " + sendResult2);
//关闭连接
producer.shutdown();
}
}
消费者过滤消息
public class Consumer {
public static void main(String[] args) throws Exception {
//创建一个消息接收对象consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo1");
//设定接收消息的命名服务器地址---获取到消息服务器ip
consumer.setNamesrvAddr("192.168.119.88:9876");
//设置接收消息对应的topic,对应的sub标签为任意*,之前producer没有指定tag。如果producer发送的消息指定了tag,那么也必须指定相应的tag
consumer.subscribe("topic1", "tag1||tag2");
//设置消费者的消费模式:也是默认的模式负载均衡
//consumer.setMessageModel(MessageModel.CLUSTERING);
//设置消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
consumer.setMessageModel(MessageModel.BROADCASTING);
//开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历接收到的消息
for (MessageExt msg : list) {
System.out.println("消息为:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息接收服务
consumer.start();
}
}
属性过滤
直接使用bySql方法过滤消息属性会报错,需要在broker.conf中配置后重启broker
enablePropertyFilter=true
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("demo1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.119.88:9876");
//启动发送服务
producer.start();
//构建消息,指定topic和body
Message msg = new Message("topic1", "tag1", "消息过滤tag1消息".getBytes());
//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");
//发送消息
SendResult sendResult = producer.send(msg,10000);
System.out.println("sendResult = " + sendResult);
//关闭连接
producer.shutdown();
}
}
消费者通过属性过滤消息
public class Consumer {
public static void main(String[] args) throws Exception {
//创建一个消息接收对象consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo1");
//设定接收消息的命名服务器地址---获取到消息服务器ip
consumer.setNamesrvAddr("192.168.119.88:9876");
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
//consumer.subscribe("topic1", MessageSelector.bySql("vip=1"));
consumer.subscribe("topic1", MessageSelector.bySql("age>18"));
consumer.setMessageModel(MessageModel.BROADCASTING);
//开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//遍历接收到的消息
for (MessageExt msg : list) {
System.out.println("消息为:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息接收服务
consumer.start();
}
}
顺序消息
某些情况下业务是有一定的顺序性的,没有订单就先生成子单,很容易出问题。如果消息是单个的还好,如果是具有一定业务顺序的,就一定会有问题
消息的发送是多线程随机发送到不同队列内的,无法保证业务消息顺序性,这时候我们就需要指定发送到的队列
同时因为消费者使用的多线程获取消息,因此也存在不是顺序问题,所以我们使用单线程获取顺序消息
- 生产者指定消息列队
- 单线程消费
使用MessageQueueSelector
和 MessageListenerOrderly
来保证顺序消息
public class Producer {
public static void main(String[] args) throws Exception {
//创建发送消息对象
DefaultMQProducer producer = new DefaultMQProducer("demo1");
//设定命名服务器地址---获取到消息服务器ip
producer.setNamesrvAddr("192.168.119.88:9876");
//启动发送服务
producer.start();
producer.setSendMsgTimeout(10000);
List<Msg> msgList = new ArrayList<>();
msgList.add(new Msg("a","消息一"));
msgList.add(new Msg("b","消息二"));
msgList.add(new Msg("c","消息三"));
//设置消息进入到指定的消息队列中
for (Msg msg : msgList) {
//构建消息
Message message = new Message("topic1",msg.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//根据发送的消息的不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值,作为索引
int index = msg.getId().hashCode() % list.size();
//返回一个消息队列MessageQueue
return list.get(index);
}
}, null);
System.out.println("sendResult = " + sendResult);
}
//关闭连接
producer.shutdown();
}
}
class Msg{
private String id;
private String msg;
public Msg(String id, String msg) {
this.id = id;
this.msg = msg;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "Msg{" +
"id='" + id + '\'' +
", msg='" + msg + '\'' +
'}';
}
}
消费者顺序接收
public class Consumer {
public static void main(String[] args) throws Exception {
//创建一个消息接收对象consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo1");
//设定接收消息的命名服务器地址---获取到消息服务器ip
consumer.setNamesrvAddr("192.168.119.88:9876");
//设置接收消息对应的topic,对应的sub标签为任意*,之前producer没有指定tag。如果producer发送的消息指定了tag,那么也必须指定相应的tag
consumer.subscribe("topic1", "*");
//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用ConsumeListenerOrderly后,对消息队列的处理由一个消息队列多个线程服务,转换为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
//遍历接收到的消息
for (MessageExt msg : list) {
System.out.println(Thread.currentThread().getName() + "消息为:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动消息接收服务
consumer.start();
}
}
ConsumeMessageThread_3消息为:Msg{id='a', msg='消息一'}
ConsumeMessageThread_4消息为:Msg{id='b', msg='消息二'}
ConsumeMessageThread_5消息为:Msg{id='c', msg='消息三'}
事务消息
三种状态
提交状态:允许进入队列,此消息与非事务消息无区别
回滚状态:不允许进入队列,此消息等同于未发送过
中间状态:完成了half消息的发送,未对MQ进行二次状态确认
注意:事务消息仅与生产者有关,与消费者无关
什么是事务消息
事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性
事务消息流程
生产者将消息发送至Apache RocketMQ服务端
Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为”暂不能投递”,这种状态下的消息即为半事务消息
生产者开始执行本地事务逻辑
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
服务端回查的间隔时间和最大回查次数,请参见参数限制
生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果
生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理
事务消息生命周期
- 初始化:半事务消息创建,生产者待发送
- 事务待提交:半事务消息被发送到服务端,并不会被持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交,此时消息对消费者不可见
- 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止
- 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息 消费重试
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见 消息存储和清理机制
如果事务补偿过程返回的状态仍然是 return LocalTransactionState.UNKNOW;那么还会一直进行事务补偿,测试结论为每60秒发起一次事务补偿
事务提交
public class Producer1 {
public static void main(String[] args) {
TransactionMQProducer producer = new TransactionMQProducer("demo1");
producer.setNamesrvAddr("192.168.119.88:9876");
producer.setSendMsgTimeout(10000);
try {
producer.start();
producer.setTransactionListener(new TransactionListener() {
//正常提交事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("提交事务消息");
return LocalTransactionState.COMMIT_MESSAGE;
}
//事务补偿过程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return null;
}
});
Message message = new Message("topic1","事务消息".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(message,null);
System.out.println("sendResult = "+ sendResult );
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
事务补偿
public class Producer2 {
public static void main(String[] args) {
TransactionMQProducer producer = new TransactionMQProducer("demo1");
producer.setNamesrvAddr("192.168.119.88:9876");
producer.setSendMsgTimeout(10000);
try {
producer.start();
producer.setTransactionListener(new TransactionListener() {
long start;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
start = System.currentTimeMillis();
return LocalTransactionState.UNKNOW;
}
//事务补偿过程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
long time = (System.currentTimeMillis() - start) / 1000;
System.out.println("time = " + time);
System.out.println("事务补偿过程执行了");
return LocalTransactionState.COMMIT_MESSAGE;//提交
}
});
Message message = new Message("topic1","事务消息".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(message,null);
System.out.println("sendResult = "+ sendResult );
//producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
事务回滚
public class Producer3 {
public static void main(String[] args) {
TransactionMQProducer producer = new TransactionMQProducer("demo1");
producer.setNamesrvAddr("192.168.119.88:9876");
producer.setSendMsgTimeout(10000);
try {
producer.start();
producer.setTransactionListener(new TransactionListener() {
//事务回滚
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行事务");
try {
int i = 1/0;
}catch (Exception e) {
System.out.println("回滚事务");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
//事务补偿过程
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;//提交
}
});
Message message = new Message("topic1","事务消息".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(message,null);
System.out.println("sendResult = "+ sendResult );
//producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
执行事务
回滚事务
sendResult = SendResult [sendStatus=SEND_OK, msgId=0A8A3F3A55B418B4AAC283EFF9B10000, offsetMsgId=null, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=0], queueOffset=11]
潜在问题
生产者发送给MQ的过程中,消息丢失怎么办?
场景1:
如果生产者将消息发送给RocketMQ时出现网络抖动或通信异常,消息可能会丢失。
解决方案:
使用RocketMQ自带的
事务消息
机制。这样,消息在发送成功之后才会执行本地操作,从而保证零丢失。事务消息的流程如下:
- 生产者发送半消息到RocketMQ。
- 若半消息发送失败,执行回滚逻辑。
- 若半消息发送成功且RocketMQ返回成功响应,执行生产者的核心链路。
- 若生产者核心链路执行失败,回滚并通知RocketMQ删除半消息。
- 若生产者核心链路执行成功,通知RocketMQ提交半消息,允许消费者消费数据。
MQ宕机,消息丢失怎么办?
场景2:
消息需要持久化到磁盘中,但有两种情况可能导致消息丢失:
- RocketMQ为了减少磁盘IO,会先将消息写入OS缓存中,然后异步刷入磁盘。如果在这个过程中Broker宕机,消息会丢失。
- 若消息已刷入磁盘但没有备份,一旦磁盘损坏,消息也会丢失。
解决方案:
- 将OS缓存的异步刷盘策略改为同步刷盘,修改Broker配置文件中的
flushDiskType
为SYNC_FLUSH
。这样可以保证消息持久化到磁盘。 - 采用主从机构,集群部署,确保Leader中的数据在多个Follower中都有备份,防止单点故障。
- 将OS缓存的异步刷盘策略改为同步刷盘,修改Broker配置文件中的
如何保证消息的顺序性?
- RocketMQ支持顺序消息,即按照发送顺序进行消费。生产者发送消息时,可以指定消息的顺序关键字,确保相同关键字的消息被顺序消费。
MQ在发给消费者的时候,消息丢失怎么办?
场景3:
消费者成功获取消息,但在完全消费之前宕机,导致消息丢失。
- 解决方案: RocketMQ在消费者中注册监听器,当消费者获取消息后,回调监听器函数处理消息。只有在消息处理完毕后返回
CONSUME_SUCCESS
,RocketMQ才认为消息已被消费。若消费者宕机,消息不会丢失。
- 解决方案: RocketMQ在消费者中注册监听器,当消费者获取消息后,回调监听器函数处理消息。只有在消息处理完毕后返回
如何保证消息不重复消费?
- 在
consumeMessage
方法中直接写消息消费的业务逻辑,确保消息不会被重复消费。 - 避免在子线程异步处理消息,以防消息还未完全消费就宕机,导致消息丢失。
- 在
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以邮件至 1300452403@qq.com