从AMQP协议可以看出,MessageQueue、Exchange和Binding构成了AMQP协议的核心,下面我们就围绕这三个主要组件 从应用使用的角度全面的介绍如何利用Rabbit MQ构建消息队列以及使用过程中的注意事项。 1. 声明MessageQueue 在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确: a)消费者是无法订阅或者获取不存在的MessageQueue中信息。 b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。 在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。 如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性: a) Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。 b) Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 c) Durable:持久化,这个会在后面作为专门一个章节讨论。 d) 其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。 2. 生产者发送消息 在AMQP模型中,Exchange是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。所以生产者想要发送消息,首先必须要声明一个Exchange和该Exchange对应的Binding。可以通过 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,声明一个Exchange需要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在创建Binding和生产者通过publish推送消息时需要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不同的Exchange会表现出不同路由行为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。声明一个Binding需要提供一个QueueName,ExchangeName和BindingKey。下面我们就分析一下不同的ExchangeType表现出的不同路由规则。 生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType: a) 如果是Direct类型,则会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。 b) 如果是 Fanout 类型,则会将消息发送给所有与该 Exchange 定义过 Binding 的所有 Queues 中去,其实是一种广播行为。 c)如果是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,如果匹配成功,则发送到对应的Queue中。 3. 消费者订阅消息 在RabbitMQ中消费者有2种方式获取队列中的消息: a) 一种是通过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息之后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,否则客户端将会一直接收队列的消息。 b) 另外一种方式是通过basic.get命令主动获取队列中的消息,但是绝对不可以通过循环调用basic.get来代替basic.consume,这是因为basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,然后检索第一条消息,然后再取消订阅。如果是高吞吐率的消费者,最好还是建议使用basic.consume。 如果有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,然后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。 这里我们可以看出,消费者再接到消息以后,都需要给服务器发送一条确认命令,这个即可以在handleDelivery里显示的调用basic.ack实现,也可以在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器可以安全的删除该消息,而不是通知生产者,与RPC不同。 如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传该消息给下一个订阅者,如果没有订阅者就会存储该消息。 既然RabbitMQ提供了ACK某一个消息的命令,当然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,可以设置一个requeue的属性,如果为true,则消息服务器会重传该消息给下一个订阅者;如果为false,则会直接删除该消息。当然,也可以通过ack,让消息服务器直接删除该消息并且不会重传。 4. 持久化: Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,所有已声明的队列,Exchange,Binding以及队列中的消息都会丢失。通过设置Exchange和MessageQueue的durable属性为true,可以使得队列和Exchange持久化,但是这还不能使得队列中的消息持久化,这需要生产者在发送消息的时候,将delivery mode设置为2,只有这3个全部设置完成后,才能保证服务器重启不会对现有的队列造成影响。这里需要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,否则在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能造成比较大的影响,可能会下降10倍不止。 5. 事务: 对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。 6. Confirm机制: 使用事务固然可以保证只有提交的事务,才会被服务器执行。但是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已被路由到正确的队列中——Confirm。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。 其他: Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 消息队列的使用过程大概如下: (1)客户端连接到消息队列服务器,打开一个channel。 (2)客户端声明一个exchange,并设置相关属性。 (3)客户端声明一个queue,并设置相关属性。 (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。 (5)客户端投递消息到exchange。 Exchanges, queues, and bindings […]
View Details前置条件: 已经安装好docker 1.查找镜像(有2种方式) ①登录rabbitmq官网找到docker镜像,选择想要的镜像的tag https://www.rabbitmq.com/download.html https://hub.docker.com/_/rabbitmq 如果需要访问web管理页面,就选择tag为management的 ps:带有alpine的是用最小linux镜像构建的,体积最小可以达5M初学者不建议这么折腾,而且 Alpine Linux使用了muslmusl实现的DNS服务不会使用resolv.conf文件中的search和domain两个配置,通过DNS来进行服务发现时需要注意。,带有-management的是带有web控制台 ②直接用docker search 搜索,默认下载标签为latest的镜像(无法打开web管理页面)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
[testhadoop@sz-145-centos101 ~]$ sudo docker search rabbitmq NAME DESCRIPTION STARS OFFICIAL AUTOMATED rabbitmq RabbitMQ is an open source multi-protocol ... 2691 [OK] bitnami/rabbitmq Bitnami Docker Image for RabbitMQ 30 [OK] tutum/rabbitmq Base docker image to run a RabbitMQ server 19 frodenas/rabbitmq A Docker Image for RabbitMQ 12 [OK] kbudde/rabbitmq-exporter rabbitmq_exporter for prometheus 11 [OK] arm32v7/rabbitmq RabbitMQ is an open source multi-protocol ... 7 cyrilix/rabbitmq-mqtt RabbitMQ MQTT Adapter 7 [OK] gonkulatorlabs/rabbitmq DEPRECATED: See maryville/rabbitmq 5 [OK] aweber/rabbitmq-autocluster RabbitMQ with the Autocluster Plugin 4 pivotalrabbitmq/rabbitmq-autocluster RabbitMQ with the rabbitmq-autocluster plu... 3 pivotalrabbitmq/rabbitmq-server-buildenv Image used to build and test RabbitMQ serv... 3 authentise/rabbitmq A RabbitMQ image that will run a bash scri... 2 [OK] deadtrickster/rabbitmq_prometheus RabbitMQ + Prometheus RabbitMQ Exporter pl... 2 henrylv206/rabbitmq-autocluster RabbitMQ Cluster 2 [OK] riftbit/rabbitmq3 RabbitMQ 3.x Container based on Alpine Lin... 1 |
ps:如果是普通用户登录,需要sudo,不然会提示错误
1 2 |
[testhadoop@sz-145-centos101 ~]$ docker search rabbitmq Get http:///var/run/docker.sock/v1.19/images/search?term=rabbitmq: dial unix /var/run/docker.sock: permission denied. Are you trying to connect to a TLS-enabled daemon without TLS? |
2.下载镜像(有时候网络问题超时,多尝试几次即可。我这里选择的是可以访问web管理界面的tag)
1 |
sudo docker pull rabbitmq:management |
3.创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin admin)
1 |
docker run -dit --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management |
from:https://www.cnblogs.com/yy-cola/p/11089800.html
View Details最全MQ消息队列有哪些? 目前在业界有哪些比较知名的消息引擎呢?如下图所示 这里面几乎完全列举了当下比较知名的消息引擎,包括: ZeroMQ 推特的Distributedlog ActiveMQ:Apache旗下的老牌消息引擎 RabbitMQ、Kafka:AMQP的默认实现。 RocketMQ Artemis:Apache的ActiveMQ下的子项目 Apollo:同样为Apache的ActiveMQ的子项目的号称下一代消息引擎 商业化的消息引擎IronMQ 以及实现了JMS(Java Message Service)标准的OpenMQ。 MQ消息队列的技术应用 1.解耦 解耦是消息队列要解决的最本质问题。 2.最终一致性 最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。 最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。 2.广播 消息队列的基本功能之一是进行广播。 有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。 3.错峰与流控 典型的使用场景就是秒杀业务用于流量削峰场景。 Kafka、RocketMQ、RabbitMQ比较 1.ActiveMQ 优点 单机吞吐量:万级 topic数量都吞吐量的影响: 时效性:ms级 可用性:高,基于主从架构实现高可用性 消息可靠性:有较低的概率丢失数据 功能支持:MQ领域的功能极其完备 缺点: 官方社区现在对ActiveMQ 5.x维护越来越少,较少在大规模吞吐的场景中使用。 2.Kafka 号称大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。 Apache Kafka它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。 目前已经被LinkedIn,Uber, Twitter, Netflix等大公司所采纳。 优点 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。 时效性:ms级 可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次; 有优秀的第三方Kafka Web管理界面Kafka-Manager; 在日志领域比较成熟,被多家公司和多个开源项目使用; 功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用 缺点: Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长 使用短轮询方式,实时性取决于轮询间隔时间; 消费失败不支持重试; 支持消息顺序,但是一台代理宕机后,就会产生消息乱序; 社区更新较慢; 3.RabbitMQ RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 RabbitMQ优点: 由于erlang语言的特性,mq 性能较好,高并发; 吞吐量到万级,MQ功能比较完备 健壮、稳定、易用、跨平台、支持多种语言、文档齐全; 开源提供的管理界面非常棒,用起来很好用 社区活跃度高; RabbitMQ缺点: erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护。 RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。 需要学习比较复杂的接口和协议,学习和维护成本较高。 4.RocketMQ RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 […]
View Details目录 一、简介 1.1 概述 1.2 消息系统介绍 1.3 点对点消息传递模式 1.4 发布-订阅消息传递模式 二、Kafka的优点 2.1 解耦 2.2 冗余(副本) 2.3 扩展性 2.4 灵活性&峰值处理能力 2.5 可恢复性 2.6 顺序保证 2.7 缓冲 2.8 异步通信 三、常用Message Queue对比 3.1 RabbitMQ 3.2 Redis 3.3 ZeroMQ 3.4 ActiveMQ 3.5 Kafka/Jafka 四、Kafka中的术语解释 4.1 概述 4.2 broker 4.3 Topic 4.3 Partition 4.4 Producer 4.5 Consumer 4.6 Consumer Group 4.7 Leader 4.8 Follower 正文 回到顶部 一、简介 1.1 概述 Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。 主要应用场景是:日志收集系统和消息系统。 Kafka主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。 同时支持离线数据处理和实时数据处理。 Scale out:支持在线水平扩展 1.2 消息系统介绍 一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。 1.3 点对点消息传递模式 在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下: 生产者发送一条消息到queue,只有一个消费者能收到。 1.4 发布-订阅消息传递模式 在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下: 发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。 回到顶部 二、Kafka的优点 2.1 解耦 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2.2 冗余(副本) 有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 2.3 扩展性 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 2.4 灵活性&峰值处理能力 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 2.5 可恢复性 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 2.6 顺序保证 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。 2.7 缓冲 在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。 2.8 异步通信 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。 回到顶部 三、常用Message Queue对比 3.1 RabbitMQ RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。 3.2 Redis Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。 3.3 ZeroMQ ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。 3.4 ActiveMQ ActiveMQ是Apache下的一个子项目。 […]
View DetailsKafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
View DetailsRabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
View Details目录 一、前言 二、队列持久化 2.1 查看存在的队列和消息数量 2.2 持久化队列 三、消息持久化 四、总结 一、前言 如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。当然还是会有一些小概率事件会导致消息丢失。 二、队列持久化 2.1 查看存在的队列和消息数量 在windows环境下,在rabbitmq的安装目录/sbin下,通过rabbitmqctl.bat list_queues查看 这边启动了两个producer,分别生成两个队列hello 和 hello1,并且他们都有一个消息存在 重启rabbitmq,模拟故障 可以看到重启后两个队列都消失了 2.2 持久化队列 我们就hello队列持久化 在声明队列名称时,持久化队列,生产端和消费端都要 1 channel.queue_declare(queue=’hello', durable=True) 我们重复上面的操作,但是给hello队列做持久化,而hello1不做,并重启rabbitmq 可以看到重启后,hello队列还在,hello1队列消失了,但是原本hello中的一条消息也没有保存下来。所以在这边我们仅仅做到了消息队列的持久化,还没有做消息持久化。 三、消息持久化 我们刚才实现了在rabbitmq崩溃的情况下,就队列本身保存下来,重启后队列还在。接下来我们要将消息也保存下来,即消息的持久化 1 2 3 4 5 6 7 8 9 channel.basic_publish(exchange=", routing_key=’hello', body=’hello', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) # 增加properties,这个properties 就是消费端 callback函数中的properties # delivery_mode = 2 持久化消息 生产端生成一个消息,并重启rabbitmq 可以看到,经过队列和消息持久化后的hello, 在重启的情况下,队列和消息都存在,没有消失。 消费端再重启后也能正常接收 四、总结 队列持久化需要在声明队列时添加参数 durable=True,这样在rabbitmq崩溃时也能保存队列 仅仅使用durable=True ,只能持久化队列,不能持久化消息 消息持久化需要在消息生成时,添加参数 properties=pika.BasicProperties(delivery_mode=2) from: https://www.cnblogs.com/bigberg/p/8195622.html#_label3
View Details引入pom.xml
1 2 3 4 5 6 7 |
<!-- https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/ https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
application.yml 配置部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#rabbitmq rabbitmq: host: 10.0.0.2 port: 5672 username: springboot password: password publisher-confirms: true publisher-returns: true template: mandatory: true #https://github.com/spring-projects/spring-boot/blob/v2.0.5.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java listener: concurrency: 2 #最小消息监听线程数 max-concurrency: 2 #最大消息监听线程数 mybatis: mapper-locations: classpath:mapping/*.xml ··· |
创建配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; //错误的 import com.rabbitmq.client.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息的载体,每个消息都会被投到一个或多个队列。 * * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来. * * Routing Key:路由关键字, * * exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。 * * Producer:消息生产者,就是投递消息的程序. Consumer:消息消费者,就是接受消息的程序. * Channel:消息通道,在客户端的每个连接里,可建立多个channel. * * */ @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String QUEUE_A = "QUEUE_A"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * * 针对消费者配置 * * 1. 设置交换机类型 * * 2. 将队列绑定到交换机 * * FanoutExchange: * 将消息分发到所有的绑定队列,无routingkey的概念 * * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 * * 如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.AbstractExchange接口。 常用交换器类型如下: Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。 Topic(TopicExchange):按规则转发消息(最灵活)。 Headers(HeadersExchange):设置header attribute参数类型的交换机。 Fanout(FanoutExchange):转发消息到所有绑定队列。 * */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } /** * 获取队列A * * @return */ @Bean public Queue queueA() { return new Queue(QUEUE_A, true); // 队列持久 } /** * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。 * * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } } |
Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
package com.redis; import java.util.UUID; import org.springframework.amqp.core.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; @Component public class MsgProducer implements RabbitTemplate.ConfirmCallback , ReturnCallback{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); // 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入 private RabbitTemplate rabbitTemplate; /** * 构造方法注入rabbitTemplate */ @Autowired public MsgProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); // rabbitTemplate如果为单例的话,那回调就是最后设置的内容 /** * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。 * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。 */ rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败"); System.out.println("消息主体 message : "+message); System.out.println("消息主体 message : "+replyCode); System.out.println("描述:"+replyText); System.out.println("消息使用的交换器 exchange : "+exchange); System.out.println("消息使用的路由键 routing : "+routingKey); } /** * rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送 rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。 * @param content */ public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); System.out.println("开始发送消息c : " + content.toLowerCase() + " ,correlationId= " + correlationId); String response = rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId).toString(); System.out.println("结束发送消息c : " + content.toLowerCase()); System.out.println("消费者响应c : " + response + " 消息处理完成"); //logger.info(" 发送消息TO A:" + content); // 把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId); } /** * 回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回调id:" + correlationData); if (ack) { logger.info("消息成功消费"); } else { logger.info("消息消费失败:" + cause); } } } |
Receiver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MsgReceiver { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitHandler public void process(String content) { System.out.println("接收处理队列A当中的消息: " + content); } } |
unit test
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.redis.MsgProducer; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqTest { @Autowired private MsgProducer sender; @Test public void sendTest() throws Exception { //while(true){ String msg = new Date().toString(); sender.sendMsg(msg); Thread.sleep(6000); //} } } |
可以测试通过 image.png 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
package com.test.ratelimit; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.RateLimiter; public class ComplexDemo { private static RateLimiter rateLimiter = RateLimiter.create(10); private static AtomicInteger suc = new AtomicInteger(0), fail = new AtomicInteger(0); public static void main(String[] args) { // TODO Auto-generated method stub List<Runnable> tasks = new ArrayList<Runnable>(); for (int i = 0; i < 100; i++) { tasks.add(new UserRequest(i)); } ExecutorService threadPool = Executors.newCachedThreadPool(); for (Runnable runnable : tasks) { threadPool.execute(runnable); } } private static boolean startGo(int i) { //基于令牌桶算法的限流实现类 /** * 一秒出10个令牌,0.1秒出一个,100个请求进来,假如100个是同时到达, 那么最终只能成交10个,90个都会因为超时而失败。 * */ /** * tryAcquire(long timeout, TimeUnit unit) * 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话, * 或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) */ //判断能否在1秒内得到令牌,如果不能则立即返回false,不会阻塞程序 if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) { System.out.println("暂时无法获取令牌, 排队失败" + i); fail.getAndIncrement(); System.out.println("SUC/FAIL=" + suc.get() + "/" + fail.get()); return false; } if (update() > 0) { System.out.println("成功" + i); suc.getAndIncrement(); System.out.println("FAIL/SUC=" + fail.get() + "/" + suc.get()); return true; } System.out.println("数据不足,失败"); return false; } private static int update() { return 1; } private static class UserRequest implements Runnable { private int id; public UserRequest(int id) { this.id = id; } public void run() { startGo(id) ; } } } 测试结果: ... 成功8 FAIL/SUC=89/10 成功7 FAIL/SUC=89/11 |
总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。 REF: http://www.rabbitmq.com/install-rpm.html 作者:DONG999 链接:https://www.jianshu.com/p/f621b47c80c3 来源:简书 简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
View Details