用docker部署RabbitMQ环境

前置条件: 已经安装好docker 1.查找镜像(有2种方式) ①登录rabbitmq官网找到docker镜像,选择想要的镜像的tag https://www.rabbitmq.com/download.html https://hub.docker.com/_/rabbitmq 如果需要访问web管理页面,就选择tag为management的 ps:带有alpine的是用最小linux镜像构建的,体积最小可以达5M初学者不建议这么折腾,而且 Alpine Linux使用了muslmusl实现的DNS服务不会使用resolv.conf文件中的search和domain两个配置,通过DNS来进行服务发现时需要注意。,带有-management的是带有web控制台   ②直接用docker search 搜索,默认下载标签为latest的镜像(无法打开web管理页面)

ps:如果是普通用户登录,需要sudo,不然会提示错误

2.下载镜像(有时候网络问题超时,多尝试几次即可。我这里选择的是可以访问web管理界面的tag)

3.创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin admin)

  from:https://www.cnblogs.com/yy-cola/p/11089800.html

Kafka、RocketMQ、RabbitMQ的优劣势比较

最全MQ消息队列有哪些? 目前在业界有哪些比较知名的消息引擎呢?如下图所示 这里面几乎完全列举了当下比较知名的消息引擎,包括: ZeroMQ 推特的Distributedlog ActiveMQ:Apache旗下的老牌消息引擎 RabbitMQ、Kafka:AMQP的默认实现。 RocketMQ Artemis:Apache的ActiveMQ下的子项目 Apollo:同样为Apache的ActiveMQ的子项目的号称下一代消息引擎 商业化的消息引擎IronMQ 以及实现了JMS(Java Message Service)标准的OpenMQ。   MQ消息队列的技术应用 1.解耦 解耦是消息队列要解决的最本质问题。 2.最终一致性 最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。 最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。 2.广播 消息队列的基本功能之一是进行广播。 有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。 3.错峰与流控 典型的使用场景就是秒杀业务用于流量削峰场景。 Kafka、RocketMQ、RabbitMQ比较   1.ActiveMQ 优点 单机吞吐量:万级 topic数量都吞吐量的影响: 时效性:ms级 可用性:高,基于主从架构实现高可用性 消息可靠性:有较低的概率丢失数据 功能支持:MQ领域的功能极其完备 缺点: 官方社区现在对ActiveMQ 5.x维护越来越少,较少在大规模吞吐的场景中使用。   2.Kafka 号称大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。 Apache Kafka它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。 目前已经被LinkedIn,Uber, Twitter, Netflix等大公司所采纳。 优点 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。 时效性:ms级 可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次; 有优秀的第三方Kafka Web管理界面Kafka-Manager; 在日志领域比较成熟,被多家公司和多个开源项目使用; 功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用 缺点: Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长 使用短轮询方式,实时性取决于轮询间隔时间; 消费失败不支持重试; 支持消息顺序,但是一台代理宕机后,就会产生消息乱序; 社区更新较慢;   3.RabbitMQ RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 RabbitMQ优点: 由于erlang语言的特性,mq 性能较好,高并发; 吞吐量到万级,MQ功能比较完备 健壮、稳定、易用、跨平台、支持多种语言、文档齐全; 开源提供的管理界面非常棒,用起来很好用 社区活跃度高; RabbitMQ缺点: erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护。 RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。 需要学习比较复杂的接口和协议,学习和维护成本较高。 4.RocketMQ RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。 RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。   RocketMQ优点: 单机吞吐量:十万级 可用性:非常高,分布式架构 消息可靠性:经过参数优化配置,消息可以做到0丢失 功能支持:MQ功能较为完善,还是分布式的,扩展性好 支持10亿级别的消息堆积,不会因为堆积导致性能下降 源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控 RocketMQ缺点: 支持的客户端语言不多,目前是java及c++,其中c++不成熟; 社区活跃度一般 没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码   消息队列选择建议 1.Kafka Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。 大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。   2.RocketMQ 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。 RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ。 3.RabbitMQ RabbitMQ :结合erlang语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护。不过,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug。 如果你的数据量没有那么大,小公司优先选择功能比较完备的RabbitMQ。   from:https://blog.csdn.net/u011663149/article/details/86232869

Kafka学习之路 (一)Kafka的简介

目录 一、简介 1.1 概述 1.2 消息系统介绍 1.3 点对点消息传递模式 1.4 发布-订阅消息传递模式 二、Kafka的优点 2.1 解耦 2.2 冗余(副本) 2.3 扩展性 2.4 灵活性&峰值处理能力 2.5 可恢复性 2.6 顺序保证 2.7 缓冲 2.8 异步通信 三、常用Message Queue对比 3.1 RabbitMQ 3.2 Redis 3.3 ZeroMQ 3.4 ActiveMQ 3.5 Kafka/Jafka 四、Kafka中的术语解释 4.1 概述 4.2 broker 4.3 Topic 4.3 Partition 4.4 Producer 4.5 Consumer 4.6 Consumer Group 4.7 Leader 4.8 Follower   正文 回到顶部 一、简介 1.1 概述 Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。 主要应用场景是:日志收集系统和消息系统。 Kafka主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。 同时支持离线数据处理和实时数据处理。 Scale out:支持在线水平扩展 1.2 消息系统介绍 一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。 1.3 点对点消息传递模式 在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下: 生产者发送一条消息到queue,只有一个消费者能收到。 1.4 发布-订阅消息传递模式 在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下: 发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。 回到顶部 二、Kafka的优点 2.1 解耦 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2.2 冗余(副本) 有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 2.3 扩展性 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 2.4 灵活性&峰值处理能力 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 2.5 可恢复性 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 2.6 顺序保证 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。 2.7 缓冲 在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。 2.8 异步通信 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。 回到顶部 三、常用Message Queue对比 3.1 RabbitMQ RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。 3.2 Redis Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。 3.3 ZeroMQ ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。 3.4 ActiveMQ ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。 3.5 Kafka/Jafka Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。 回到顶部 四、Kafka中的术语解释 4.1 概述 在深入理解Kafka之前,先介绍一下Kafka中的术语。下图展示了Kafka的相关术语以及之间的关系: 上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。 如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。 4.2 broker Kafka 集群包含一个或多个服务器,服务器节点称为broker。 broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。 4.3 Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处) 类似于数据库的表名 4.3 Partition topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。 4.4 Producer 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。 4.5 Consumer 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。 4.6 Consumer Group 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。 4.7 Leader 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 4.8 Follower Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。   from:https://www.cnblogs.com/qingyunzong/p/9004509.html

Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

rabbitmq

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

RabbitMQ消息持久化

目录 一、前言 二、队列持久化   2.1 查看存在的队列和消息数量   2.2 持久化队列 三、消息持久化 四、总结 一、前言 如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。当然还是会有一些小概率事件会导致消息丢失。 二、队列持久化   2.1 查看存在的队列和消息数量 在windows环境下,在rabbitmq的安装目录/sbin下,通过rabbitmqctl.bat list_queues查看  这边启动了两个producer,分别生成两个队列hello 和 hello1,并且他们都有一个消息存在 重启rabbitmq,模拟故障 可以看到重启后两个队列都消失了   2.2 持久化队列 我们就hello队列持久化 在声明队列名称时,持久化队列,生产端和消费端都要 1 channel.queue_declare(queue=’hello', durable=True) 我们重复上面的操作,但是给hello队列做持久化,而hello1不做,并重启rabbitmq 可以看到重启后,hello队列还在,hello1队列消失了,但是原本hello中的一条消息也没有保存下来。所以在这边我们仅仅做到了消息队列的持久化,还没有做消息持久化。 三、消息持久化 我们刚才实现了在rabbitmq崩溃的情况下,就队列本身保存下来,重启后队列还在。接下来我们要将消息也保存下来,即消息的持久化 1 2 3 4 5 6 7 8 9 channel.basic_publish(exchange=",                       routing_key=’hello',                       body=’hello',                       properties=pika.BasicProperties(                           delivery_mode=2,  # make message persistent                       )) # 增加properties,这个properties 就是消费端 callback函数中的properties # delivery_mode = 2  持久化消息 生产端生成一个消息,并重启rabbitmq 可以看到,经过队列和消息持久化后的hello, 在重启的情况下,队列和消息都存在,没有消失。 消费端再重启后也能正常接收 四、总结 队列持久化需要在声明队列时添加参数 durable=True,这样在rabbitmq崩溃时也能保存队列 仅仅使用durable=True ,只能持久化队列,不能持久化消息 消息持久化需要在消息生成时,添加参数 properties=pika.BasicProperties(delivery_mode=2)   from: https://www.cnblogs.com/bigberg/p/8195622.html#_label3

用rabbitMq解决web高并发的学习笔记

引入pom.xml

  application.yml 配置部分

  创建配置文件

  Producer

  Receiver

  unit test

  可以测试通过 image.png 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例

  总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。 REF: http://www.rabbitmq.com/install-rpm.html 作者:DONG999 链接:https://www.jianshu.com/p/f621b47c80c3 来源:简书 简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

服务级高并发秒杀优化(RabbitMQ+接口优化)

1. 安装RabbitMQ linux下的安装没什么可说的,因为本机懒得重装虚拟机了,所以就下载了windows版本进行安装。 erlang下载地址:http://www.erlang.org/download.html rabbitMQ下载:http://www.rabbitmq.com/download.html 直接下载安装即可。 因为想用可视化界面监控消息,所以先激活这个功能。

  然后重启rabbitMQ服务。输入网址:http://localhost:15672/。 使用默认用户guest/guest进入网页端控制台。 2. rabbitMQ基本原理和使用 rabbitMQ原理 Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 消息队列的使用过程大概如下 客户端连接到消息队列服务器,打开一个channel。 客户端声明一个exchange,并设置相关属性。 客户端声明一个queue,并设置相关属性。 客户端使用routing key,在exchange和queue之间建立好绑定关系。 客户端投递消息到exchange。 总结:exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 Direct交换机 完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。 Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。 Topic交换机 对key进行模式匹配后进行投递的叫做Topic交换机。*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。 在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。 我们创建3个绑定:Q1绑定键是“.orange.”,Q2绑定键是“..rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。 Fanout交换机 还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。 所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。 Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。 所以,Fanout Exchange 转发消息是最快的。 Headers交换机 首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。 持久化 RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分: – exchange持久化,在声明时指定durable => 1 – queue持久化,在声明时指定durable => 1 – 消息持久化,在投递时指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。 3. rabbitMQ-Direct交换机 这种模式是最简单的模式,就发送一串字符串,这个字符串为key,接收的时候也完全以这个字符串本来来确定,不需要绑定任何exchange,使用默认的就行。我们以这个模式开始在原来的项目上继续集成。 首先是引入依赖:

  appilication.yml:

  rabbitMQ配置类MQConfig:

  发送者MQSender:

  接收者MQReceiver:

  这样,就完成了最简单的一个字符串的发送-接受。可以在controller中随便测试一下:

  4. rabbitMQ-Topic交换机 这个模式正如上面所言,是可以匹配通配符的,显然更加灵活,这里用程序测试一下这个模式效果。 MQConfig: 先来几个常量:

  下面配置几个bean: 注:带有 @Configuration 的注解类表示这个类可以使用 Spring IoC 容器作为 bean 定义的来源。@Bean 注解告诉 Spring,一个带有 @Bean 的注解方法将返回一个对象,该对象应该被注册为在 Spring 应用程序上下文中的 bean。

  MQSender:

  MQReceiver:

  最后测试一把:

  运行结果:

  运行结果与初期的分析结果一致。 5. rabbitMQ-Fanout交换机 这种就是广播模式,即所有的绑定到指定的exchange上的queue都可以接收消息。 MQConfig:

  MQSender:

  MQReceiver:

  运行结果:

  queue1和queue2都接受到了消息。 6. rabbitMQ-Headers交换机 MQConfig:

  MQSender:

  MQReceiver:

  7. 秒杀优化 思路:减少数据库访问 系统初始化,把商品库存数量加载到redis 收到请求,redis预减库存,库存不够,直接返回,否则进入3 请求入队,立即返回排队中 请求出队,生成订单,减少库存 客户端轮询,是否秒杀成功 对于之前的秒杀接口do_miaosha:

  这里判断库存是直接从数据库查,因为并发量比较大,存在性能问题。后面秒杀到之后,也不是直接减库存, 而是将其放到消息队列中慢慢交给数据库去调整。

  在消息队列中对消息进行消化:

  对于controller中的优化1:redis预减库存。那么需要在系统启动的时候将秒杀商品的库存先添加到redis中:

  重写afterPropertiesSet()方法:

  对于前端,这时也要进行修改了,因为点击秒杀商品按键后,这里考虑三种情况:排队等待、失败、成功。那么这里规定-1为失败,0为排队,1为秒杀成功已经写入数据库。 原来的detail.htm中秒杀事件函数:

  秒杀到商品就直接返回,现在后端改为消息队列,所以需要增加函数进行判断,必要时需要轮询:

  所以将其改为:

  那么相应地,后台也要增加一个方法:result

  那么如何标记状态呢?这就是getMiaoshaResult方法所做的事情。 对于成功的状态判断,很简单,从数据库查,能查到就说明已经秒杀成功,否则就是两种情况:失败或者正在等待生成订单。 对于这两种状态,我们需要用redis来实现,思路是:在系统初始化的时候,redis中设置秒杀商品是否卖完的状态为false—即未卖完;

  在MiaoshaService中的Miaosha方法:数据库减库存失败的话,说明数据库的库存已经小于0了,那么这个时候,立即将redis初始设置的秒杀商品是否卖完的状态为true,表示商品已经全部卖完,返回秒杀失败。否则就是要前端等待等待。

  对于两个小方法getGoodsOver和setGoodsOver:

  那么redis预减库存,然后消息队列来进行创建订单就实现了。 当然,对于redis预减库存这一点,还有要优化的地方,就是现在的do_miaosha接口是这样的:

  但是,当秒杀商品已经没得时候,就没有必要再去redis中进行判断了,毕竟查询redis也是需要网络开销的,解决思路是:在内存中进行判断,如果redisService.decr得到的stock少于零的时候,直接将内存中的一个标志改变一下,那么下次再进入do_miaosha接口,先判断内存这个标记,如果库存已经小于0了,就不再访问redis,而是直接返回秒杀商品已经卖完。

  声明一个map:

  那么在afterPropertiesSet这个系统加载的初始化方法中对这个map进行初始化,goodsId–stock:

  在原来的redis预减库存初,发现库存小于0 ,就改为true:

  最后do_miaosha接口变为:

  ok,整个关于redis预减库存和rabbitMQ创建订单这个优化已经基本完成了。

.Net高并发解决思路

首先在windows上安装好Redis,RabbitMQ Redis-cli使用示例 ModelContext.cs代码:

在 Package Manager Console 下运行命令 Enable-Migrations 这个命令将在项目下创建文件夹 Migrations The Configuration class 这个类允许你去配置如何迁移,对于本文将使用默认的配置(在本文中因为只有一个 Context, Enable-Migrations 将自动对 context type 作出适配); An InitialCreate migration (本文为201702220232375_20170222.cs)这个迁移之所以存在是因为我们之前用 Code First 创建了数据库, 在启用迁移前,scaffolded migration 里的代码表示在数据库中已经创建的对象,本文中即为表 Person(列 Id 和 Name). 文件名包含一个 timestamp 以便排序(如果之前数据库没有被创建,那么 InitialCreate migration 将不会被创建,相反,当我们第一次调用 Add-Migration 的时候所有表都将归集到一个新的 migration 中) 多个实体锁定同一数据库 当使用 EF6 之前的版本时,只会有一个 Code First Model 被用来生成/管理数据库的 Schema, 这将导致每个数据库只会有一张 __MigrationsHistory 表,从而无法辨别实体与模型的对应关系。 从 EF6 开始,Configuration 类将会包含一个 ContextKey 属性,它将作为每一个 Code First Model 的唯一标识符, __MigrationsHistory 表中一个相应地的列允许来自多个模型(multiple models)的实体共享表(entries),默认情况下这个属性被设置成 context 的完全限定名。 定制化迁移 在 Package Manager Console 中运行命令 Add-Migration XXXXXXXXX 生成的迁移如下

  Configuration.cs代码:

  我们对迁移做些更改: 以下是本项目无关的其他示例:

  在 Package Manager Console 中运行命令 Update-Database –Verbose 消费者端,用来把消息队列里的数据写入数据库 MqHelper.cs代码:

  Program.cs代码:

  模拟并发的MVC网站,写入队列 HomeController.cs代码

  MqPublish.cs类

  RedisHelper.cs代码:

  Index.cshtml内容:

  Startup.cs代码:

  Web.config

  运行结果如图: 运行消费者端(控制台运用程序) from: https://blog.csdn.net/andrewniu/article/details/82856702

RabbitMQ的几种典型使用场景

AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。  7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。 8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。 9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。 在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层: 1.Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。 2.Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。 3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。 RabbitMQ使用场景 学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html 场景1:单发送单接收 使用场景:简单的发送与接收,没有特别的处理。 Producer:

Consumer:

场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。 Producer:

发送端和场景1不同点: 1、使用“task_queue”声明了另一个Queue,因为RabbitMQ不容许声明2个相同名称、配置不同的Queue 2、使"task_queue"的Queue的durable的属性为true,即使消息队列durable 3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable. Consumer:

接收端和场景1不同点: 1、使用“task_queue”声明消息队列,并使消息队列durable 2、在使用channel.basicConsume接收消息时使autoAck为false,即不自动会发ack,由channel.basicAck()在消息处理完成后发送消息。 3、使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端。 注意点: 1)It’s a common mistake to miss the basicAck. It’s an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won’t be able to release any unacked messages. 2)Note on message persistence Marking messages as persistent doesn’t fully guarantee that a message won’t be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet. Also, RabbitMQ doesn’t do fsync(2) for every message — it may be just saved to cache and not really written to the disk. The persistence guarantees aren’t strong, but it’s more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in atransaction. 3)Note about queue size If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy. 场景3:Publish/Subscribe 使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。 Producer:

发送端: 发送消息到一个名为“logs”的exchange上,使用“fanout”方式发送,即广播消息,不需要使用queue,发送端不需要关心谁接收。 Consumer:

接收端: 1、声明名为“logs”的exchange的,方式为"fanout",和发送端一样。 2、channel.queueDeclare().getQueue();该语句得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息。 3、注意binding queue的时候,channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略! 场景4:Routing (按路线发送接收) 使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。 Producer:

发送端和场景3的区别: 1、exchange的type为direct 2、发送消息的时候加入了routing key Consumer:

接收端和场景3的区别: 在绑定queue和exchange的时候使用了routing key,即从该exchange上只接收routing key指定的消息。 场景5:Topics (按topic发送接收) 使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。 Producer:

发送端和场景4的区别: 1、exchange的type为topic 2、发送消息的routing key不是固定的单词,而是匹配字符串,如"*.lu.#",*匹配一个单词,#匹配0个或多个单词。 Consumer:

接收端和场景4的区别: 1、exchange的type为topic 2、接收消息的routing key不是固定的单词,而是匹配字符串。 注意点: Topic exchange Topic exchange is powerful and can behave like other exchanges. When a queue is bound with "#" (hash) binding key – it will receive all the messages, regardless of the routing key – like in fanout exchange. When special characters "*" (star) and "#" (hash) aren’t used in bindings, the topic exchange will behave just like a direct one.   from: https://blog.csdn.net/qq_15427331/article/details/62416486

高并发场景之RabbitMQ篇

上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案 但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题, 因为使用Redis要自己实现分布式锁   这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。 下面我们来用RabbitMQ来实现上一篇的场景   一、新建RabbitMQ.Receive

二、新建RabbitMQ.Send

  这里是模拟500个用户请求,正常的话最后Total就等于500 我们来说试试看,运行程序 2.1、打开接收端 2.2 运行客户端 2.3、可以看到2边几乎是实时的,再去看看数据库 三、我们在集群里执行   最后数据是1000   完全没有冲突,好了,就是这样 。   from: https://www.cnblogs.com/lanxiaoke/p/6659548.html

在CentOS7上安装RabbitMQ

安装过程参考官网: Installing on RPM-based Linux (RHEL, CentOS, Fedora, openSUSE) 首先需要安装erlang,参考:http://fedoraproject.org/wiki/EPEL/FAQ#howtouse

安装过程中会有提示,一路输入“y”即可。 完成后安装RabbitMQ: 先下载rpm:

下载完成后安装:

完成后启动服务:

可以查看服务状态:

这里可以看到log文件的位置,转到文件位置,打开文件: 这里显示的是没有找到配置文件,我们可以自己创建这个文件

编辑内容如下:

这里的意思是开放使用,rabbitmq默认创建的用户guest,密码也是guest,这个用户默认只能是本机访问,localhost或者127.0.0.1,从外部访问需要添加上面的配置。 保存配置后重启服务:

此时就可以从外部访问了,但此时再看log文件,发现内容还是原来的,还是显示没有找到配置文件,可以手动删除这个文件再重启服务,不过这不影响使用

  开放5672端口:

在Windows上进行测试: 新建.net core控制台项目,引用RabbitMQ.Client包:

测试代码:

也可以使用官网的例子(这里更清晰): http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html 发送端:

接收端:

在Windows上发送,在CentOS上接收,效果如图:   开启管理UI:

在Windows下打开地址:

用户名和密码都是 guest 这样就可以方便管理RabbitMQ了。   from:https://www.cnblogs.com/uptothesky/p/6094357.html