集美阅读大全是一个以文章句子为主题的在线阅读网站。内含有各种经典好文章,爱情美文,诗歌散文,情感句子说说,范文资料等。读好文章,尽在集美阅读大全!!!
当前位置:集美阅读大全 >杂文 > 正文

解决RabbitMQ消息丢失问题和保证消息可靠性 – Java高级架构师n的个人空间


工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,万一消息丢了怎么办?什么情况下消息就不见了呢?下面通过这篇文章,我们就聊聊RabbitMQ 中消息的可靠性如何解决的?

本文分三部分说明

  1. mq消息丢失场景有哪些?
  2. 如何避免消息丢失?
  3. 大厂如何解决这些问题的?

mq消息丢失场景有哪些?

首先我们看下消息周期投递过程:

解决RabbitMQ消息丢失问题和保证消息可靠性 - Java高级架构师n的个人空间

 

我们把该图分三部分,左中右三部分,每部分都会导致消息丢失情况:

1.生产者生产消息到RabbitMQ-Server 消息丢失场景

  1. 外界环境问题导致:发生网络丢包、网络故障等造成消息丢失
  2. 代码层面,配置层面,考虑不全导致消息丢失

发送端使用Confirm模式,方案不够严谨,比如MQ Server接收消息失败发送 nack给发送端后,发送端监听失败或者没做任何事情,消息丢失的情况;

再比如发送消息到exchange后,发下路由和queue没有绑定,消息会存在丢失情况,下面会讲到具体的例子。

2.RabbitMQ-Server中存储的消息丢失

  1. 消息没有持久化导致丢失
  2. 单节点或者集群模式没有镜像模式消息丢失
  3. 个别磁盘意外损害导致消息同步失败
  4. 机房被炸

3.RabbitMQ-Server到消费者消息丢失

  1. 消费者接收到相关消息之后,还没来得及处理就宕机了,消息丢失

如何避免消息丢失?

下面也是从三个方面介绍:

  1. 生产者生产消息到RabbitMQ-Server 可靠性保证
  2. RabbitMQ-Server中存储的消息如何保证
  3. RabbitMQ-Server到消费者消息如何不丢

1. 生产者生产消息到RabbitMQ-Server可靠性保证

这个过程,消息可能会丢,比如发生网络丢包、网络故障等造成消息丢失,一般情况下如果不采取措施,生产者无法感知消息是否已经正确无误的发送到exchange中,如果生产者能感知到的话,它可以进行进一步的处理动作,比如重新投递相关消息以确保消息的可靠性。

1.1 别担心,有一种方案可以解决:就是 AMQP协议提供的一个事务机制

RabbitMQ客户端中Channel 接口提供了几个事务机制相关的方法: channel.txSelect channel.txCommit channel.txRollback 源码截图如下:com.rabbitmq.client 包中public interface Channel extendsShutdownNotifier {}接口

解决RabbitMQ消息丢失问题和保证消息可靠性 - Java高级架构师n的个人空间

 

在生产者发送消息之前,通过channel.txSelect开启一个事务,接着发送消息, 如果消息投递server失败,进行事务回滚channel.txRollback,然后重新发送, 如果server收到消息,就提交事务channel.txCommit但是,很少有人这么干,因为这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ-Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。

1.2 不过幸运的是RabbitMQ提供了一个改进方案,即发送方确认机制(publisher confirm)

首先生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一deliveryTag和multiple参数),这就使得生产者知晓消息已经正确到达了目的地了。

其实Confirm模式有三种方式实现:

  1. 串行confirm模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端confirm,如果服务器端返回false或者超时时间内未返回,客户端进行消息重传。
  2. 批量confirm模式:producer每发送一批消息后,调用waitForConfirms()方法,等待broker端confirm。
  3. 异步confirm模式:提供一个回调方法,broker confirm了一条或者多条消息后producer端会回调这个方法。 我们分别来看看这三种confirm模式

串行confirm

for(int i = 0;i<50;i++){   channel.basicPublish(   exchange, routingKey,   mandatory, immediate,   messageProperties,   message.getContent()   );   if (channel.waitForConfirms()) {   System.out.println("发送成功");   } else {   //发送失败这里可进行消息重新投递的逻辑   System.out.println("发送失败");   }  }

批量confirm模式

for(int i = 0;i<50;i++){   channel.basicPublish(   exchange, routingKey,   mandatory, immediate,   messageProperties,   message.getContent()   );  }  if (channel.waitForConfirms()) {   System.out.println("发送成功");  } else {   System.out.println("发送失败");  }

上面代码是简单版本的,生产环境绝对不是循环发送的,而是根据业务情况, 各个客户端程序需要定期(每x秒)或定量(每x条)或者两者结合来pubish消息,然后等待服务器端confirm。相比普通confirm模式,批量可以极大提升confirm效率。

但是有没有发现什么问题?

问题1: 批量发送的逻辑复杂话了。

问题2: 一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。

异步confirm模式

Channel channel = channelManager.getPublisherChannel(namespaceName);  ProxiedConfirmListener confirmListener = new ProxiedConfirmListener();//监听类  confirmListener.setChannelManager(channelManager);  confirmListener.setChannel(channel);  confirmListener.setNamespace(namespaceName);  confirmListener.addSuccessCallbacks(successCallbacks);  channel.addConfirmListener(confirmListener);  channel.confirmSelect();//开启confirm模式  AMQP.BasicProperties messageProperties = null;  if (message.getProperty() instanceof AMQP.BasicProperties) {   messageProperties = (AMQP.BasicProperties) message.getProperty();  }  confirmListener.toConfirm(channel.getNextPublishSeqNo(), rawMsg);  for(int i = 0;i<50;i++){   channel.basicPublish(   exchange, routingKey,   mandatory, immediate,   messageProperties,   message.getContent()   );  }

异步模式需要自己多写一部分复杂的代码实现,异步监听类,监听server端的通知消息,异步的好处性能会大幅度提升,发送完毕之后,可以继续发送其他消息。 MQServer通知生产端ConfirmListener监听类:用户可以继承接口实现自己的实现类,处理消息确认机制,此处继承类代码省略,就是上面 ProxiedConfirmListener 类: 下面贴下要实现的接口:

package com.rabbitmq.client;  import java.io.IOException;  /**   * Implement this interface in order to be notified of Confirm events.   * Acks represent messages handled successfully; Nacks represent   * messages lost by the broker. Note, the lost messages could still   * have been delivered to consumers, but the broker cannot guarantee   * this.   */  public interface ConfirmListener {   /**   ** handleAck RabbitMQ消息接收成功的方法,成功后业务可以做的事情   ** 发送端投递消息前,需要把消息先存起来,比如用KV存储,接收到ack后删除   **/   void handleAck(long deliveryTag, boolean multiple)   throws IOException;   //handleNack RabbitMQ消息接收失败的通知方法,用户可以在这里重新投递消息   void handleNack(long deliveryTag, boolean multiple)   throws IOException;  }

上面的接口很有意思,如果是你的话,怎么实现? 消息投递前如何存储消息,ack 和 nack 如何处理消息?

下面看下异步confirm的消息投递流程:

解决RabbitMQ消息丢失问题和保证消息可靠性 - Java高级架构师n的个人空间

 

解释下这张图片:

channerl1 连续发类1,2,3条消息到RabbitMQ-Server,RabbitMQ-Server通知返回一条通知,里面包含回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外还有一个参数multiple=true,表示到这个序号之前的所有消息都已经得到了处理。这样客户端和服务端通知的次数就减少类,提升类性能。

加点消息存储和删除逻辑

事务机制和publisher confirm机制确保的是消息能够正确的发送至RabbitMQ,这里的“发送至RabbitMQ”的含义是指消息被正确的发往至RabbitMQ的交换器,如果此交换器没有匹配的队列的话,那么消息也将会丢失,怎么办?

这里有两个解决方案,

1. 使用mandatory 设置true

2. 利用备份交换机(alternate-exchange):实现没有路由到队列的消息

我们看下RabbitMQ客户端代码方法

Channel 类中 发布消息方法

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)   throws IOException;

解释下:basicPublish 方法中的,mandatory和immediate

/**   * 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue, 那么会调用basic.return方法将消息返回给生产者<br>   * 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。   */   @Setter(AccessLevel.PACKAGE)   private boolean mandatory = false;   /**   * 当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上没有消费者, 那么这条消息不会放入队列中。   当immediate标志位设置为false时,exchange路由的队列没有消费者时,该消息会通过basic.return方法返还给生产者。   * RabbitMQ 3.0版本开始去掉了对于immediate参数的支持,对此RabbitMQ官方解释是:这个关键字违背了生产者和消费者之间解耦的特性,因为生产者不关心消息是否被消费者消费掉   */   @Setter(AccessLevel.PACKAGE)   private boolean immediate;

所以为了保证消息的可靠性,需要设置发送消息代码逻辑。如果不单独形式设置mandatory=false

使用mandatory 设置true的时候有个关键点要调整,生产者如何获取到没有被正确路由到合适队列的消息呢?通过调用channel.addReturnListener来添加ReturnListener监听器实现,只要发送的消息,没有路由到具体的队列,ReturnListener就会收到监听消息。

channel.addReturnListener(new ReturnListener() {   public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP   .BasicProperties basicProperties, byte[] body) throws IOException {   String message = new String(body);   //进入该方法表示,没路由到具体的队列   //监听到消息,可以重新投递或者其它方案来提高消息的可靠性。   System.out.println("Basic.Return返回的结果是:" + message);   }   });

此时有人问了,不想复杂化生产者的编程逻辑,又不想消息丢失,那么怎么办? 还好RabbitMQ提供了一个叫做alternate-exchange东西,翻译下就是备份交换器,这个干什么用呢?很简单,它可以将未被路由的消息存储在另一个exchange队列中,再在需要的时候去处理这些消息。

那如何实现呢?

简单一点可以通过webui管理后台设置,当你新建一个exchange业务的时候,可以给它设置Arguments,这个参数就是 alternate-exchange,其实alternate-exchange就是一个普通的exchange,类型最好是fanout 方便管理

解决RabbitMQ消息丢失问题和保证消息可靠性 - Java高级架构师n的个人空间

 

当你发送消息到你自己的exchange时候,对应key没有路由到queue,就会自动转移到alternate-exchange对应的queue,起码消息不会丢失。

下面一张图看下投递过程:

解决RabbitMQ消息丢失问题和保证消息可靠性 - Java高级架构师n的个人空间

 

那么有人有个疑问,上面介绍了,两种方式处理,发送的消息无法路由到队列的方案, 如果备份交换器和mandatory参数一起使用,会有什么效果?

答案是:mandatory参数无效

总结下上面内容,主要如何保证消息从生产者到RabbitMQ Server 端可靠性

1. Transaction: 消息落盘,只能同步开启、提交及回滚。

2. Confirm:消息进入缓冲区,支持同步、异步、批量确认。

3. Transaction和publisher confirm机制两者是互斥的

4. 一般在生产者这块避免数据丢失,都是用 Confirm 机制的。

2.RabbitMQ-Server中存储的消息如何保证

一般消息都是存内存中的,如果消息没有持久化硬盘,一天机器需要重启,获取意外停电,重启机器后,消息全丢了,所以消息持久化是必备。

您可能感兴趣的文章

  • Chrome测试新功能:强制任何网站进入暗黑模式
  • Chrome测试新功能:强制任何网站进入暗黑模式
  • 朋友圈广告推出限时推广模式
  • 2019下半年,产业互联网的八大趋势
  • 这些年来的互联网风口,你追了吗?
  • 免费试用后,百度网盘出现3元/5分钟延长加速下载
  • 百度网盘更改现有会员模式,推单次付费服务
  • WordPress生成HTML静态化网站

未经允许不得转载:杂烩网 » 解决RabbitMQ消息丢失问题和保证消息可靠性 – Java高级架构师n的个人空间

课后答案张九龄《望月怀远》阅读答案及全诗翻译赏析

望月怀远张九龄海上生明月,天涯共此时。情人怨遥夜,竟夕起相思。灭烛怜光满,披衣觉露滋。不堪盈手赠,还寝梦佳期。注释⑴怀远:怀念远方的亲人。⑵最前面两句:辽阔无边的大海上升起一轮明月,使人想起了远在天涯……
2023-11-22 04:53暂无评论阅读详情

课后答案王安石《次韵唐公三首其三旅思》阅读答案

次韵唐公三首其三旅思王安石此身南北老,愁见问征途。地大蟠三楚,天低入五湖。看云心共远,步月影同孤。慷慨秋风起,悲歌不为鲈②。注:①张壤,字唐公,北宋嘉佑六年契丹国母生辰使,王安石友人。②《晋书&mid……
2023-11-22 04:52暂无评论阅读详情

笔记心得各级干部学习执法为民心得体会

  &ldquo;各级干部都要牢固树立全心全意为人民服务的思想和真心实意对人民负责的精神,做到心里装着群众,凡事想着群众,工作依靠群众,一切为了群众。要坚持权为民所用,情为民所系,利为民所谋,为群众诚……
2023-11-22 04:12暂无评论阅读详情

笔记心得寒假大学生社会实践心得体会

  自从走进了大学,就业问题就似乎总是围绕在我们的身边,成了说不完的话题。在现今社会,招聘会上的大字报都总写着&ldquo;有经验者优先&rdquo;,可还在校园里面的我们这班学子社会经验又会拥有多少……
2023-11-22 04:08暂无评论阅读详情

协议书济南市某美容院转让协议第2篇

&nbsp;&nbsp;__________美容院根据中华人民共和国国务院劳动法规和________市私营企业劳动管理实施办法,结合本美容院经营的具体所需今制订此劳动合同书。&nbsp;&nbsp;双……
2023-11-22 02:36暂无评论阅读详情

剧本劳模宣传短剧剧本《阿咪也想当劳模》

  1、机械厂门卫处,日,外。  清早,机械厂班长李玉伟开着别克赛欧小汽车驶进厂区,门卫室内的保安一边按开电动门,一边朝李玉伟摆手。  李玉伟:(摇下车窗,笑着打招呼)小秦,早。  保安小秦:(笑着)……
2023-11-22 02:11暂无评论阅读详情

教程灰雀说课稿

灰雀说课稿  灰雀说课稿(一):  《灰雀》说课稿  一、说教材  《灰雀》是义务教育课程标准实验教科书,小学语文第五册第二单元的一篇讲读课文。这篇课文记叙了列宁在莫斯科郊外养病期间爱护灰雀的故事。列……
2023-11-22 00:41暂无评论阅读详情

课件“吴隐之字处默,濮阳鄄城人”阅读答案及原文

吴隐之字处默,濮阳鄄城人。美姿容,善谈论,博涉文史,以儒雅标名。弱冠而介立,有清操,虽儋石无储,不取非其道。事母孝谨,及其执丧,哀毁过礼。与太常韩康伯邻居,康伯母,贤明妇人也,每闻隐之哭声,辍餐投箸,……
2023-11-22 00:38暂无评论阅读详情

标签