本文介绍RabbitMQ。
1. 概述 MQ(Message Queue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是Message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。消息处理下游也只需要依赖MQ获取消息(任务)即可。
1.1 MQ的作用
流量削峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制 ,把一秒内下的订单分散成一段时间来处理,这是有些用户可能在下单十几秒后才能收到下单成功的操作,虽然时间响应变慢了,但是比不能下单的体验更好。
应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修改。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中间用户感受不到物流系统的故障,提升系统的可用性。本质上就是消息队列作为缓存,作为一个代理,存储了中间结果,降低了流程处理过程。
异步处理
有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询API查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅,使用消息队列,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务 。这样A服务既不用循环调用B的查询API,也不会提供callback api。同样B服务也不用做这些操作。A服务还能及时地得到异步处理成功的消息。
1.2 MQ分类
ActiveMQ
比较久远的,单击吞吐量较高,时效性是毫秒级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据(丢失概率较低)。目前维护较少了。
Kafka
大数据的杀手锏,数据量较大适合使用。缺点是Kafka单击超过64个时,发送消息响应时间变长。会丢失消息。
RocketMQ
是阿里的开源产品,采用Java语言实现,在设计时参考了Kafka,单击吞吐量较高。消息可以做到0丢失。但是支持的客户端语言不多,仅Java和C++。
RabbitMQ
是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。健壮稳定跨平台。基于Erlang语言开发。
1.3 MQ的选择
Kafka
适合产生大量数据的业务,大型公司,如果有日志采集功能,首选Kafka。
RocketMQ
适合金融互联网领域,稳定性较好,适合双十一这种情况。
RabbitMQ
时效性微秒级,如果数据量不太大,且中小型公司,适合使用。
1.4 RabbitMQ介绍 RabbitMQ是一个消息中间件:它接受并转发消息。可以把它当做一个快递站点,当你要发送一个包裹时,就需要将包裹放到快递站,快递员最终会把快递送到收件人那里。按照这种逻辑RabbitMQ是一个快递站,帮我们处理快递。RabbitMQ与快递站的主要区别在于,它不处理快件而是接受、存储和转发消息任务数据。
RabbitMQ有如下四个核心概念:
生产者
产生数据发送消息的程序是生产者
交换机
交换机是RabbitMQ非常重要的一个部件,一方面他接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理他接收到的消息,是将这些消息推送到特定队列还是推动到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定。
队列
队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但他们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
消费者
消费者与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者、消费者和消息中间件很多时候并不在同一个机器上。同一个应用程序既可以是生产者又可以是消费者。
1.5 RabbitMQ的核心模式
Hello World
Work Queues
Publish/Subscribe
Routing
Topics
Publisher Confirms
Broker:消息实体,是接收和分发消息的应用,RabbitMQ Server 就是Message Broker。Exchange是交换机,Queue是队列。
Virtual host:一个Broker中可以存在多个Virtual host,每个vhost可以包括多个excahnge。
Connection:生产者/消费者 和 Broker 之间的TCP连接。
Channel:信道,每一个访问RabbirMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯 ,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP Connection的开销。
Exchange:交换机,message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point)、topic(publish-subscribe)、fanout(multicast)。
Queue:消息最终被送到这里等待消费者取走
Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据。
1.6 安装 目前在本地安装,简单学习一下。
由于RabbitMQ基于Erlang语言开发,所以必须要有Erlang环境。为了使用方便,可安装RabbitMQ的一个web管理插件。
【Windows安装RabbitMQ详细教程】_慕之寒的博客-CSDN博客_rabbitmq安装windows
注意,如果用户名是中文的话,其日志是有问题的,所以启动也会失败,因此需要修改日志存放位置,不能出现中文。RabbitMQ启动失败!?原因竟是…… - Sitr-金融摸鱼哥 - 博客园 (cnblogs.com)
RabbitMQ Management打不开的问题 - 爱码网 (likecs.com)
2. 核心部分(模式) 在安装好RabbitMQ之后,下面对主流的各种模式进行代码演示。创建普通maven工程即可。注意需要添加jdk的编译版本插件以及rabbitmq的依赖和操作文件流的依赖。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <configuration > <source > 8</source > <target > 8</target > </configuration > </plugin > </plugins > </build > <dependencies > <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.8.0</version > </dependency > <dependency > <groupId > commons-io</groupId > <artifactId > commons-io</artifactId > <version > 2.6</version > </dependency > </dependencies >
2.1 Hello World简单队列模式 这种模式其实就是生产者消费者模式。需要有生产者给队列(hello),然后队列将消息转发给消费者(消费者接收消息)。
2.1.1 生产者代码 其实这里的生产者需要编写的代码和1.5中的原理图类似,
需要有逻辑连接Connection;
然后需要有具体的信道Channel;
当然,创建逻辑连接,除了生产者自身之外,还要有消息队列,即连接工厂,指明具体的消息队列IP以及用户名和密码;
在消息队列中需要有Broker(交换机和队列),由于是简单模式,因此这里省略交换机,只有一个队列,因此Channel发送消息前需要设定一些规则,比如交换机,队列。这里声明队列的名称QUEUE_NAME,并指明一些其他参数。
运行程序即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class Producer { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1" ); factory.setUsername("admin" ); factory.setPassword("admin" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); String message = "hello world" ; channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); System.out.println("消息发送完毕" ); } }
运行完程序后,可从后端RabbitMQ管理页面中发现,消息队列中确实有一个消息准备好了,也就是说生产者确实将消息发送到了消息队列中。
2.1.2 消费者代码 其实消费者和生产者类似,同样需要创建连接和信道,最终消费消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Consumer { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1" ); factory.setUsername("admin" ); factory.setPassword("admin" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(QUEUE_NAME, true , deliverCallback, cancelCallback); } }
可以看到,确认消息队列中的消息被消费了。
2.2 Work Queues工作队列模式 工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程(消费者)将弹出任务(消费)并最终执行任务。当有多个工作线程(多个消费者)时,这些工作线程将一起处理这些任务。
简单地说,工作队列模式就是一个生产者多个消费者。但是任务队列中的任务只能被某个消费者消费一次,不能被处理多次,否则业务逻辑出现问题。 因此,队列在分发任务的时候,实际上是轮询分发。
2.2.1 案例 代码演示,一个生产者,两个消费者。 为了简单起见,将创建逻辑连接和信道的代码抽取成工具类。代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class RabbitMQUtils { public static Channel getChannel () throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1" ); factory.setUsername("admin" ); factory.setPassword("admin" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
消费者代码和上面的类似,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class Worker01 { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接收到的消息:" + new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; System.out.println("W1等待接收消息..." ); channel.basicConsume(QUEUE_NAME, true , deliverCallback, cancelCallback); } }
多个消费者,其实代码是一样的,为了开启多个消费者(多个进程),这里采用IDEA提供的设置,先运行本程序,然后点开运行配置,勾选右上角允许并行运行即可再次运行。
生产者采用循环输入的方式发送消息。代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class Task01 { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); String message; Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { message = scanner.next(); channel.basicPublish("" , QUEUE_NAME, null , message.getBytes()); System.out.println(message + " 发送完成" ); } } }
运行生产者,在控制台输入以下内容。
可以看到两个消费者确实是轮询消费了消息。
2.2.2 消息应答问题
上面只是最基本的情况。但是这里存在一个问题。
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个厂的任务并且仅只完成了部分,突然就挂掉了,会发生什么情况呢?
RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,如果某个消费者挂掉了,我们将丢失正在处理的消息。因此,为了保证消息已经被处理完,必须在处理完之后由消费者给予应答回复。
为了保证消息在发送过程中不丢失,RabbitMQ引入消息应答机制:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ就可以把该消息删除了。
消息应答机制分为自动应答和手动应答。
2.2.3 自动应答 消息发送后立即被认为已经传送成功【显然这其实就是没应答,是不是那么靠谱的】,这种模式需要在高吞吐量和数据传输安全性方面做权衡。
其实这并没有应答,因为如果消费者Connection或者Channel如谈关闭,那么消息就丢失了;或者生产者产生的消息太多,导致消费者来不及处理消息,即消息积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
自动应答就是消息发送后就是成功了。
2.2.4 手动应答 手动应答是由消费者主动通知消息队列 。有如下几种应答方式:
Channel.basicAck()
表示消息肯定处理成功了。
Channel.basicNack()
表示消息不确定处理成功了。
Channel.basicReject()
表示消息不确定处理成功了。
2.2.5 消息自动重新入队 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
注意,因为存在回调函数,即获取到消息之后的消费者的操作。因此可在消费者中进行消息应答。
生产者代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class Task03 { public static final String QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); Scanner s = new Scanner(System.in); String message; while (s.hasNext()) { message = s.next(); channel.basicPublish("" , QUEUE_NAME, null , message.getBytes("UTF-8" ))); System.out.println("生产者发出消息:" + message); } } }
消费者代码,这里要测试不同消费者场景,因此这里写两个消费者,一个消费者处理较快,一个处理较慢。这里仅展示一个,另一个只需要修改sleep时间即可。注意,应答是在回调函数中写的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Worker03 { public static final String QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); System.out.println("W1等待接收消息,处理时间较短" ); DeliverCallback deliverCallback = (consumerTag, message) -> { try { Thread.sleep(1000 * 1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("接收到的消息:" + new String(message.getBody(), "UTF-8" )); channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(QUEUE_NAME, false , deliverCallback, cancelCallback); } }
注意,如果队列不存在,那么必须先让生产者运行(声明队列),否则先开启消费者会报错,因为消费者仅仅是从队列中取消息,如果队列不存在,当然会报错。
因为work2的工作时间较长,可在工作过程中手动停止程序(模拟挂掉),可以看到消息又重新转发到work1中。即手动确认实现了消息不丢失。
2.2.6 RabbitMQ持久化 上面仅仅保证了消费者宕机后消息可以不丢失。但是如何保证RabbitMQ服务挂掉后消息不丢失呢?默认情况下,RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息。为了确保消息不会丢失需要做两件事:将队列和消息都标记为持久化 。
注意,队列是RabbitMQ中的组件,消息是消息,二者不是一体的,需要分别持久化。
队列实现持久化
直接在声明队列时,将第二个参数改为true。
1 channel.queueDeclare(QUEUE_NAME, true , false , false , null );
但是,如果直接将已经存在的队列修改为true,会报错,此时必须先将该队列删除,再重新创建。持久化的队列会被标记为D。
1 Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
消息持久化
在发布消息的第三个参数中设置MessageProperties.PERSISTENT_TEXT_PLAIN
。这种方式其实并不能完全保证不会丢失消息。尽管他告诉了RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单队列任务而言,已经绰绰有余了。如果需要更强有力的持久化策略,见后续的发布确认模式 。
1 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
2.2.7 不公平分发 上面只是最基本的轮询分发,但是在测试中可以发现,对于处理速度比较慢的消费者,显然也会分配指定数量的消息,这时候,处理速度比较快的消费者会处于空闲状态。其实为了综合考虑效率,应该采用不公平分发,速度快的分发多一点 。
可在消费者消费语句之前采用信道channel设置,0表示轮询,1表示不公平分发 。其实就是表明本消费者可以采用不公平分发。
2.2.8 预取值(prefetch) 不再是轮询或者不公平分发,而是指定每个消费者被分发的数量。同样和不公平分发一样设置,非0非1表示具体的预取值的数量。(或者说,这是消费者消息堆积的最大值)
2.3 Publish/Subscribe发布订阅模式 上面的两种模式中的消息都是只能发送给一个消费者。有时候需要将消息发送给多个消费者 ,这就是发布订阅模式 。发布订阅模式需要用到交换机,参见2.7节。即由交换机将消息转发到不同的队列中,队列中的消息只能被消费一次,这样多个队列达到了多次消费 。
发布订阅模式就是交换机采用fanout方式,即交换机将消息无差别的传送到多个队列中。本质上就是广播,所以生产者以及所有消费者,在绑定以及发布消息时的routingkey是任意的。
注意,交换机在生产者还是在消费者中声明都是可以的,可以声明两次,也可以声明一次。但是声明的名称一定要一致,并且类型也要一致。如果名称不一致,显然不会正确接收;如果名称一致,类型不一致,则会报错。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class EmitLog { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); Scanner scanner = new Scanner(System.in); String message; while (scanner.hasNext()) { message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "hj" , null , message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" + message); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ReceiveLogs01 { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "" ); System.out.println("ReceiveLogs01等待接收消息,把接收到的消息打印在屏幕上。。。" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(queueName, true , deliverCallback, cancelCallback); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ReceiveLogs02 { public static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "" ); System.out.println("ReceiveLogs02等待接收消息,把接收到的消息打印在屏幕上。。。" ); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(queueName, true , deliverCallback, cancelCallback); } }
2.4 路由模式 路由模式就是根据规则来路由到指定的消费者队列中,不再像广播一样无差别发送。如果有多个队列绑定规则一样,也是可以接收到的 ,路由模式和广播模式的区别是按照一定的绑定规则发送和接收。这里代码不再赘述,和上面类似。
1 channel.exchangeDeclare(EXCHANGE_NAME, "direct" );
2.5 主题模式 发布订阅模式,是所有的消费者都可以收到。路由模式,是指定的消费者可以收到。但是,这两种都是固定死的,不能动态的指定消费者。
此时,有这么一种需求,对于某些信息,发送给某几个消费者;对于另外其余信息,发送给某个消费者。即消费者不是固定的,此时就需要主题模式了,通过正则表达式模糊匹配绑定规则来发送到指定的队列中。 如下图所示,如果仅想要Q1收到,那么中间单词必须是orange,且第一个单词不能是lazy,最后一个单词不能是rabbit。如果想要Q1、Q2同时收到,后两个单词只要是orange.rabbit即可,或者前两个单词是lazy.orange。
#
代表任意个单词,*
表示1个单词。当一个队列的绑定键(routingkey)是#
,那么就类似fanout模式了;如果队列绑定建中没有#
和*
,那么就类似direct模式了。
2.6 发布确认 2.2.6中的持久化提到过,当时的方式并不能完全保证持久化,因为生产者到消息队列传输过程中可能丢失。因此需要发布确认机制 。也就是当消息真正的持久化到磁盘中时,消息队列会主动向生产者发送确认消息。因此总体上有三种机制保证队列和消息持久化:
设置要求队列必须持久化
声明队列时进行设置
设置要求队列中的消息必须持久化
发送消息时进行设置
发布确认
在获取信道后,可以开启发布确认:channel.confirmSelect()
。
那么究竟是发送一条就确认一条、还是发送若干条之后再统一确认呢?有单个、批量和异步三种。
2.6.1 单个确认发布 这是一种简单的确认方式,它是一种同步确认发布 的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定的时间范围内这个消息没有被确认,那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说,这可能已经足够了。
1000条消息大概是658毫秒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static void publishMessageIndividually () throws Exception { Channel channel = RabbitMQUtils.getChannel(); String queueName = "Single" ; channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" , queueName, null , message.getBytes()); boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功" ); } } long end = System.currentTimeMillis(); System.out.println("单个确认总用时: " + (end - begin) + "毫秒" ); }
2.6.2 批量确认发布 通过实验发现,上面那种方式非常慢,与单个等待确认消息相比,先发送一批消息然后一起确认可以极大地提高吞吐量。这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。这种方案仍然是同步的,也一样阻塞消息的发布 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public static void publishMessageBatch () throws Exception { Channel channel = RabbitMQUtils.getChannel(); String queueName = "Single" ; channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); int batchSize = 100 ; for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" , queueName, null , message.getBytes()); if (i % batchSize == 0 ) { boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功" ); } } } long end = System.currentTimeMillis(); System.out.println("批量确认总用时: " + (end - begin) + "毫秒" ); }
批量确认总用时约59毫秒。时间确实变短了,但是不是很安全。可以看出批量和单个的区别就是waitForConfirms方法的调用次数。
2.6.3 异步确认发布 异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没的说,它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调保证是否投递成功。这里其实就是将确认控制权交给了消息队列,生产者无需等待消息队列发送确认后再次发送,而是一直发送。即下一次发送和上一次确认是异步的。
而下面开启的监听器就是一个子线程,主线程就是不停地发送消息,子线程用于监听消息队列发送过来的消息,比如发送失败还是发送成功。
代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public static void publishMessageAsync () throws Exception { Channel channel = RabbitMQUtils.getChannel(); String queueName = "Single" ; channel.queueDeclare(queueName, true , false , false , null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); ConfirmCallback ackCallback = (deliverTag, multiple) -> { System.out.println("确认的消息:" + deliverTag); }; ConfirmCallback nackCallback = (deliverTag, multiple) -> { System.out.println("未确认的消息:" + deliverTag); }; channel.addConfirmListener(ackCallback, nackCallback); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" , queueName, null , message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("批量确认总用时: " + (end - begin) + "毫秒" ); }
上面有一个问题就是发送失败的消息没有存储,后续不能重新发送。但是这是两个线程,换句话说,即使监听器存储了消息,主线程还是无法获取到监听器存储的数据的,最好的解决办法就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如ConcurrentSkipListMap。
首先在发送的时候,记录已经发送的消息
监听器的确认回调函数中,删除确认成功的消息【因为一定是先发送再确认的,所以必定确认的是已经存在的】
最后剩余的就是没有发送成功的。
性能最好,在出现错误的情况下可以很好地控制。
2.7 交换机 上面的例子中,都没有使用交换机(采用默认交换机),交换机就是将消息路由到指定的队列中。
2.7.1 概述 RabbitMQ消息传递模型的核心思想是:生产者生产的消息不会直接发送到队列中。实际上,通常生产者甚至都不知道这些消息传递到了哪些队列中。
生产者只能将消息发送到交换机(Exchange),交换机工作的内容非常简单,一方面他接收来自生产者的消息,另一方面将他们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列呢?还是说放到许多队列?又或者是直接丢失呢?这就由交换机的类型来决定。交换机有如下几种类型(分别是) :
直连(direct) ,路由 ,对应2.4模式
主题(topic) ,主题 ,对应2.5模式
标题(headers),使用较少
扇出(fanout) ,也就是发布订阅 ,对应2.3模式
无名,就是2.1、2.2中的,使用空串表示。本质上交换机是通过routingkey(bindingkey)来绑定队列的,即basicPublish()方法的第二个参数,如果交换机名称是空串,那么routingkey默认就是队列名称。
2.7.2 临时队列 没有被持久化的队列称为临时队列。
2.7.3 绑定 绑定指的是交换机和队列之间的捆绑关系。
3. 高级部分 3.1 死信队列 3.1.1 概述 死信,顾名思义就是无法被消费的消息。一般来说,生产者将消息投递到Broker后,消费者从队列中取出消息进行消费。但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,进而形成了死信队列。
注意,死信其实不能被丢失,而是被传送到死信交换机(dead_exchange)上,并进入死信队列(dead_queue)保存。
3.1.2 应用场景 应用场景:为了保定订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中。还有比如说:用户在商城下单成功后并点击去支付后在指定时间未支付时自动失效。
3.1.3 死信的来源 死信有如下三个来源可造成:
消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq中),超出的消息成为死信。
消息被拒绝(basic.reject或basic.nack)并且requeue=false;
3.1.4 死信案例 流程如下所示:
声明普通交换机,声明普通队列,二者进行绑定 ,C1从该队列中获取消息
声明死信交换机,声明死信队列,二者进行绑定 ,C2从该队列中获取消息
死信交换机和普通队列具有一定的绑定关系,即当普通队列的消息称为死信时,需要将其转发到死信交换机 。【可在普通队列声明时,通过最后一个参数进行设置转发到死信交换机】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct" ); Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange" , DEAD_EXCHANGE_NAME); arguments.put("x-dead-letter-routing-key" , "lisi" ); channel.queueDeclare(NORMAL_QUEUE_NAME, false , false , false , arguments); channel.queueBind(NORMAL_QUEUE_NAME, EXCHANGE_NAME, "zhangsan" ); channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct" ); channel.queueDeclare(DEAD_QUEUE_NAME, false , false , false , null ); channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi" );
在生产者代码中设置消息的过期时间,第三个参数设置过期时间即可。当消息在TTL内没有被消费的话,自动转为死信,之后根据上面绑定的关系,会转发到死信交换机,并传输到死信队列中,最终有C2消费者获取到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class Producer { public static final String EXCHANGE_NAME = "normal_exchange" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000" ).build(); for (int i = 1 ; i < 11 ; i++) { String message = "info" + 1 ; channel.basicPublish(EXCHANGE_NAME, "zhangsan" , properties, message.getBytes()); } } }
那么该怎么测试呢?可先开启消费者1,创建死信队列、死信交换机、普通队列、普通交换机等信息。 然后关闭该消费者1,开启消费者2,并开启生产者发送消息,这样,生产者产生的消息肯定不会被普通消费者C1消费,等待TTL后,会成为死信,被消费者2消费。
下面是没开启消费者2的时候的情况转变,开启生产者后,普通队列中的10条数据在10秒后均成为死信,放在了死信队列中。
消费者2其实就是普通消费者,只不过消费的是死信队列,这里代码不再赘述。
上面是超时称为死信,下面测试一下队列满了之后多余的信息成为死信,即超过队列长度之后,设置队列长度和设置死信队列类似,也是在第三个参数设置。【注意,此时在生产者发送消息时,无需设置TTL了】
1 2 arguments.put("x-max-length" , 6 );
同样先打开消费者C1,创建队列和交换机,然后关闭该消费者,打开生产者,生产10个消息。打开后台可以发现,多余的4条信息在死信队列中。【经过测试,发现先产生的消息成为了死信,而不是后产生的消息 】
第三种情况是消息被拒绝,此时如果要模拟这种情况,在消费者处必须是手动应答,不能是自动应答。那么手动应答和手动拒绝,就是在回调函数中,判断是否拒绝,并给出拒绝和应答。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class Consumer01 { public static final String EXCHANGE_NAME = "normal_exchange" ; public static final String NORMAL_QUEUE_NAME = "normal_queue" ; public static final String DEAD_EXCHANGE_NAME = "dead_exchange" ; public static final String DEAD_QUEUE_NAME = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct" ); Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange" , DEAD_EXCHANGE_NAME); arguments.put("x-dead-letter-routing-key" , "lisi" ); channel.queueDeclare(NORMAL_QUEUE_NAME, false , false , false , arguments); channel.queueBind(NORMAL_QUEUE_NAME, EXCHANGE_NAME, "zhangsan" ); channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct" ); channel.queueDeclare(DEAD_QUEUE_NAME, false , false , false , null ); channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi" ); DeliverCallback deliverCallback = (consumerTag, message) -> { if (message.getBody().equals("info5" )) { System.out.println("本消息是拒绝的:" + new String(message.getBody())); channel.basicReject(message.getEnvelope().getDeliveryTag(), false ); } else { System.out.println(new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); } }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(NORMAL_QUEUE_NAME, false , deliverCallback, cancelCallback); } }
可以看到,死信队列中只有info5。
3.2 延迟队列 3.2.1 概述 延迟队列其实就是死信队列三种中的一种,即TTL造成的死信。此时死信队列消费者消费到消息,与生产者生产消息后就会产生一定的延迟。
延迟队列,队列内部是有序的,最重要的特定就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理。简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
3.2.2 应用场景
订单在十分中之内未支付则自动取消
新创建的店铺,如果在十天之内都没有上传过商品,则自动发送消息提醒。
用户注册成功后,如果三天内没有登录则进行短信提醒。
用户发起退款后,如果三天内没有得到处理则通知相关运营人员。
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
初步看这些应用场景,其实都有一个特点,就是需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;这看起来似乎使用定时任务,一直轮询数据,每秒查询一次,取出需要被处理的数据,然后处理不就完了吗?
如果数据量比较少,确实可以这样做,比如对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的订单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如“订单十分钟内未支付则关闭”,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显示是不可取的,很可在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
一个买火车票的流程
用户抢购车票并前往付款界面,生成订单,付款状态为:未支付
根据订单id等信息生成消息到RabbitMQ队列(设置延迟TTL时间为30分钟)
当到达30分钟后,会进入到延迟队列,延迟队列消费者会获取到消息,根据消息去订单数据库查询对应的订单,判断是否付款,如果未付款,那么更新订单为失效状态【即到期自动取消】,并车票数量加1。如果已付款,那么则无需操作。
如果用户在30分钟之内已付款,那么就根据付款操作修改数据库订单状态即可,【方便第3步匹配】
3.3.3 案例 为了演示延迟队列案例,需要先见第5节,基于SpringBoot整合RabbitMQ。案例结构如下所示:
一个生产者,一个普通交换机,两个普通队列,一个延迟10秒,一个延迟40秒。一个延迟交换机,一个延迟队列。一个消费者。
3.3 发布确认高级 3.4 幂等性 3.5 优先级队列 3.6 惰性队列 4. 集群部分 5. SpringBoot整合RabbitMQ 创建普通SpringBoot项目即可,添加依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.47</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency > </dependencies >
设置配置文件application.properties,设置连接RabbitMQ的参数。
1 2 3 4 5 6 7 8 9 10 11 spring.rabbitmq.host =127.0.0.1 spring.rabbitmq.port =5672 spring.rabbitmq.username =admin spring.rabbitmq.password =admin Spring.mvc.pathmatch.matching-strategy =ant_path_matcher
创建config文件,用于swagger页面显示API。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig () { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi" ) .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo () { return new ApiInfoBuilder() .title("rabbitmq接口文档" ) .description("本文档描述了rabbitmq微服务接口定义" ) .version("1.0" ) .contact(new Contact("hianian" , "http://www.hianian.xyz" , "hianian@gmail.com" )) .build(); } }
利用SpringBoot,可直接用配置文件将交换机和队列声明出来,这样无需在消费者代码中再声明了,逻辑更加清晰。
5.1 配置文件声明交换机队列 代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 @Configuration public class TTLQueueConfig { public static final String NORMAL_EXCHANGE = "X" ; public static final String DEAD_EXCHANGE = "Y" ; public static final String NORMAL_QUEUE_A = "QA" ; public static final String NORMAL_QUEUE_B = "QB" ; public static final String DEAD_QUEUE_D = "QD" ; @Bean("xExchange") public DirectExchange xExchange () { return new DirectExchange(NORMAL_EXCHANGE); } @Bean("queueA") public Queue queueA () { Map<String, Object> arguments = new HashMap<>(3 ); arguments.put("x-dead-letter-exchange" , DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" , "YD" ); arguments.put("x-message-ttl" , 10000 ); return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(arguments).build(); } @Bean("queueB") public Queue queueB () { Map<String, Object> arguments = new HashMap<>(3 ); arguments.put("x-dead-letter-exchange" , DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" , "YD" ); arguments.put("x-message-ttl" , 40000 ); return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(arguments).build(); } @Bean("yExchange") public DirectExchange yExchange () { return new DirectExchange(DEAD_EXCHANGE); } @Bean("queueD") public Queue queueD () { return QueueBuilder.durable(DEAD_QUEUE_D).build(); } @Bean public Binding queueABindingX (@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA" ); } @Bean public Binding queueBBindingX (@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB" ); } @Bean public Binding queueDBindingY (@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD" ); } }
5.2 生产者 在Controller中写生产者,生产者可以发往10秒的延迟队列中,也可发往40秒的延迟队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j @RestController @RequestMapping("ttl") public class SendMsg { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg (@PathVariable String message) { log.info("当前时间:{}, 发送一条消息给两个TTL队列:{}" , new Date().toString(), message); rabbitTemplate.convertAndSend("X" , "XA" , "消息来自ttl为10s的队列:" + message); rabbitTemplate.convertAndSend("X" , "XB" , "消息来自ttl为40s的队列:" + message); } }
5.3 消费者 消费者是监听器的形式出现的。代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD (Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); log.info("当前时间:{},收到私信队列的消息:{}" , new Date().toString(), msg); } }
5.4 结果验证 启动项目,在浏览器输入http://localhost:8080/ttl/sendMsg/嘻嘻嘻
,可以看到,控制台在一定的时间间隔内输出了内容,即消费者在指定的时间内确实获取到了私信队列的消息。
也就是说,消费者(监听器)已监听到死信队列QD中有消息了,就立马获取消息。而死信队列是由于普通队列中的消息TTL到期了,自动转为死信。
6. 总结 总体来看,我有以下理解:
消费者要想获取消息,必须获取连接,获取信道。必须声明要获取的交换机(如果该交换机已经存在可以不声明),以及队列,将二者进行绑定。这样,交换机在收到消息后,才能将消息按照一定的模式规则将消息发送到该队列中,从而消费者获取到消息。
生产者要想发送消息,必须获取连接,获取信道。必须声明交换机(如果该交换机已经存在可以不声明),将消息发送到指定的交换机。
由于生产者只是在将消息发送到交换机,所以可以无需声明队列。而消费者需要从队列中获取消息,所以最好在消费者处声明队列(如果没有)。至于交换机,在消费者/生产者处都可声明。
另外,队列和交换机绑定是有一定规则的。比如路由、发布订阅等。队列和交换机绑定,可以绑定多次规则 。
7. 备注 参考B栈《尚硅谷》。