博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ的消息安全机制
阅读量:2069 次
发布时间:2019-04-29

本文共 2270 字,大约阅读时间需要 7 分钟。

序言

       关于JMS有一些共性的问题,每个框架给出的解决方案各不相同.这里仅针对RabbitMQ.

 

消息的持久化

      这个问题主要用于解决,当服务器宕机的时候,队列中的信息都没了的问题.

      如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。

     Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,但是这还不能使得队列中的消息持久化,这需要生产者在发送消息的时候,将delivery mode设置为2,只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。

 

生产者的提交事务

      对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。

 

 

生产者的Confirm机制

        使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后(这里只要推送到了指定的queue就会返回confirm),服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。

 

 

消费者确认机制(默认开启)

RabbitMQ把消息推送给Consumer,RabbitMQ就会把这个消息进行锁定,在锁定状态的消息不会被重复推送也就是二次消费。

其他consumer可以继续消费下一个消息,当消息的consumer确认消费完成之后发送一个ack给RabbitMQ,RabbitMQ会将这个消息删除。
如果超过一定时间RabbitMQ没有收到consumer的ack,就会把这个消息进行解锁,重新放入队列头,保证消息的顺序。(这个有不同的说法)

消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。(莫非新版本有里延迟的设置)

这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…

 

 

关于消息的顺序

这个可以从逻辑上进行控制,也可以在队列上进行处理,比如只有一个消费者.没消费完一次才能消费下一个消息.

 

关于重复消费

这个可以从逻辑上进行判断,但是这是万不得以的情况.另后面想到了问题在补充,欢迎骚扰:cuiyaonan2000@163.com

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

转载地址:http://nfcmf.baihongyu.com/

你可能感兴趣的文章
多线程和CPU的关系
查看>>
认识cpu、核与线程
查看>>
关于Java健壮性的一些思考与实践!
查看>>
如何避免自己写的代码成为别人眼中的一坨屎!
查看>>
Postman 安装及使用入门教程
查看>>
获取指定包下所有自定义注解并提取注解信息
查看>>
Windows 环境下 Git clone pull fetch 慢 解决之道
查看>>
Redis (error) NOAUTH Authentication required.解决方法
查看>>
plsql窗口中文显示的是横版的 问题解决办法
查看>>
使用notePad修改将文件格式保存后不起作用
查看>>
如何查询oracle会话及锁 如何查锁了哪张表?如何杀掉会话
查看>>
Git常用命令速查手册
查看>>
Redis运维利器 -- RedisManager
查看>>
分布式之REDIS复习精讲
查看>>
分布式之数据库和缓存双写一致性方案解析
查看>>
Redis集群
查看>>
Oracle 查看和扩展表空间
查看>>
记一次线上Java程序导致服务器CPU占用率过高的问题排除过程
查看>>
Java 内存溢出(java.lang.OutOfMemoryError)的常见情况和处理方式总结
查看>>
从cpu和内存来理解为什么数组比链表查询快
查看>>