用docker部署RabbitMQ环境

前置条件: 已经安装好docker 1.查找镜像(有2种方式) ①登录rabbitmq官网找到docker镜像,选择想要的镜像的tag https://www.rabbitmq.com/download.html https://hub.docker.com/_/rabbitmq 如果需要访问web管理页面,就选择tag为management的 ps:带有alpine的是用最小linux镜像构建的,体积最小可以达5M初学者不建议这么折腾,而且 Alpine Linux使用了muslmusl实现的DNS服务不会使用resolv.conf文件中的search和domain两个配置,通过DNS来进行服务发现时需要注意。,带有-management的是带有web控制台   ②直接用docker search 搜索,默认下载标签为latest的镜像(无法打开web管理页面)

ps:如果是普通用户登录,需要sudo,不然会提示错误

2.下载镜像(有时候网络问题超时,多尝试几次即可。我这里选择的是可以访问web管理界面的tag)

3.创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin admin)

  from:https://www.cnblogs.com/yy-cola/p/11089800.html

Kafka、RocketMQ、RabbitMQ的优劣势比较

最全MQ消息队列有哪些? 目前在业界有哪些比较知名的消息引擎呢?如下图所示 这里面几乎完全列举了当下比较知名的消息引擎,包括: ZeroMQ 推特的Distributedlog ActiveMQ:Apache旗下的老牌消息引擎 RabbitMQ、Kafka:AMQP的默认实现。 RocketMQ Artemis:Apache的ActiveMQ下的子项目 Apollo:同样为Apache的ActiveMQ的子项目的号称下一代消息引擎 商业化的消息引擎IronMQ 以及实现了JMS(Java Message Service)标准的OpenMQ。   MQ消息队列的技术应用 1.解耦 解耦是消息队列要解决的最本质问题。 2.最终一致性 最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。 最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。 2.广播 消息队列的基本功能之一是进行广播。 有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。 3.错峰与流控 典型的使用场景就是秒杀业务用于流量削峰场景。 Kafka、RocketMQ、RabbitMQ比较   1.ActiveMQ 优点 单机吞吐量:万级 topic数量都吞吐量的影响: 时效性:ms级 可用性:高,基于主从架构实现高可用性 消息可靠性:有较低的概率丢失数据 功能支持:MQ领域的功能极其完备 缺点: 官方社区现在对ActiveMQ 5.x维护越来越少,较少在大规模吞吐的场景中使用。   2.Kafka 号称大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。 Apache Kafka它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。 目前已经被LinkedIn,Uber, Twitter, Netflix等大公司所采纳。 优点 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。 时效性:ms级 可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次; 有优秀的第三方Kafka Web管理界面Kafka-Manager; 在日志领域比较成熟,被多家公司和多个开源项目使用; 功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用 缺点: Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长 使用短轮询方式,实时性取决于轮询间隔时间; 消费失败不支持重试; 支持消息顺序,但是一台代理宕机后,就会产生消息乱序; 社区更新较慢;   3.RabbitMQ RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 RabbitMQ优点: 由于erlang语言的特性,mq 性能较好,高并发; 吞吐量到万级,MQ功能比较完备 健壮、稳定、易用、跨平台、支持多种语言、文档齐全; 开源提供的管理界面非常棒,用起来很好用 社区活跃度高; RabbitMQ缺点: erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护。 RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。 需要学习比较复杂的接口和协议,学习和维护成本较高。 4.RocketMQ RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。 RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。   RocketMQ优点: 单机吞吐量:十万级 可用性:非常高,分布式架构 消息可靠性:经过参数优化配置,消息可以做到0丢失 功能支持:MQ功能较为完善,还是分布式的,扩展性好 支持10亿级别的消息堆积,不会因为堆积导致性能下降 源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控 RocketMQ缺点: 支持的客户端语言不多,目前是java及c++,其中c++不成熟; 社区活跃度一般 没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码   消息队列选择建议 1.Kafka Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。 大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。   2.RocketMQ 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。 RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ。 3.RabbitMQ RabbitMQ :结合erlang语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护。不过,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug。 如果你的数据量没有那么大,小公司优先选择功能比较完备的RabbitMQ。   from:https://blog.csdn.net/u011663149/article/details/86232869

Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

rabbitmq

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

RabbitMQ消息持久化

目录 一、前言 二、队列持久化   2.1 查看存在的队列和消息数量   2.2 持久化队列 三、消息持久化 四、总结 一、前言 如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。当然还是会有一些小概率事件会导致消息丢失。 二、队列持久化   2.1 查看存在的队列和消息数量 在windows环境下,在rabbitmq的安装目录/sbin下,通过rabbitmqctl.bat list_queues查看  这边启动了两个producer,分别生成两个队列hello 和 hello1,并且他们都有一个消息存在 重启rabbitmq,模拟故障 可以看到重启后两个队列都消失了   2.2 持久化队列 我们就hello队列持久化 在声明队列名称时,持久化队列,生产端和消费端都要 1 channel.queue_declare(queue=’hello', durable=True) 我们重复上面的操作,但是给hello队列做持久化,而hello1不做,并重启rabbitmq 可以看到重启后,hello队列还在,hello1队列消失了,但是原本hello中的一条消息也没有保存下来。所以在这边我们仅仅做到了消息队列的持久化,还没有做消息持久化。 三、消息持久化 我们刚才实现了在rabbitmq崩溃的情况下,就队列本身保存下来,重启后队列还在。接下来我们要将消息也保存下来,即消息的持久化 1 2 3 4 5 6 7 8 9 channel.basic_publish(exchange=",                       routing_key=’hello',                       body=’hello',                       properties=pika.BasicProperties(                           delivery_mode=2,  # make message persistent                       )) # 增加properties,这个properties 就是消费端 callback函数中的properties # delivery_mode = 2  持久化消息 生产端生成一个消息,并重启rabbitmq 可以看到,经过队列和消息持久化后的hello, 在重启的情况下,队列和消息都存在,没有消失。 消费端再重启后也能正常接收 四、总结 队列持久化需要在声明队列时添加参数 durable=True,这样在rabbitmq崩溃时也能保存队列 仅仅使用durable=True ,只能持久化队列,不能持久化消息 消息持久化需要在消息生成时,添加参数 properties=pika.BasicProperties(delivery_mode=2)   from: https://www.cnblogs.com/bigberg/p/8195622.html#_label3

用rabbitMq解决web高并发的学习笔记

引入pom.xml

  application.yml 配置部分

  创建配置文件

  Producer

  Receiver

  unit test

  可以测试通过 image.png 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例

  总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。 REF: http://www.rabbitmq.com/install-rpm.html 作者:DONG999 链接:https://www.jianshu.com/p/f621b47c80c3 来源:简书 简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

服务级高并发秒杀优化(RabbitMQ+接口优化)

1. 安装RabbitMQ linux下的安装没什么可说的,因为本机懒得重装虚拟机了,所以就下载了windows版本进行安装。 erlang下载地址:http://www.erlang.org/download.html rabbitMQ下载:http://www.rabbitmq.com/download.html 直接下载安装即可。 因为想用可视化界面监控消息,所以先激活这个功能。

  然后重启rabbitMQ服务。输入网址:http://localhost:15672/。 使用默认用户guest/guest进入网页端控制台。 2. rabbitMQ基本原理和使用 rabbitMQ原理 Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 消息队列的使用过程大概如下 客户端连接到消息队列服务器,打开一个channel。 客户端声明一个exchange,并设置相关属性。 客户端声明一个queue,并设置相关属性。 客户端使用routing key,在exchange和queue之间建立好绑定关系。 客户端投递消息到exchange。 总结:exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 Direct交换机 完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。 Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。 Topic交换机 对key进行模式匹配后进行投递的叫做Topic交换机。*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。 在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。 我们创建3个绑定:Q1绑定键是“.orange.”,Q2绑定键是“..rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。 Fanout交换机 还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。 所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。 Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。 所以,Fanout Exchange 转发消息是最快的。 Headers交换机 首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。 持久化 RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分: – exchange持久化,在声明时指定durable => 1 – queue持久化,在声明时指定durable => 1 – 消息持久化,在投递时指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。 3. rabbitMQ-Direct交换机 这种模式是最简单的模式,就发送一串字符串,这个字符串为key,接收的时候也完全以这个字符串本来来确定,不需要绑定任何exchange,使用默认的就行。我们以这个模式开始在原来的项目上继续集成。 首先是引入依赖:

  appilication.yml:

  rabbitMQ配置类MQConfig:

  发送者MQSender:

  接收者MQReceiver:

  这样,就完成了最简单的一个字符串的发送-接受。可以在controller中随便测试一下:

  4. rabbitMQ-Topic交换机 这个模式正如上面所言,是可以匹配通配符的,显然更加灵活,这里用程序测试一下这个模式效果。 MQConfig: 先来几个常量:

  下面配置几个bean: 注:带有 @Configuration 的注解类表示这个类可以使用 Spring IoC 容器作为 bean 定义的来源。@Bean 注解告诉 Spring,一个带有 @Bean 的注解方法将返回一个对象,该对象应该被注册为在 Spring 应用程序上下文中的 bean。

  MQSender:

  MQReceiver:

  最后测试一把:

  运行结果:

  运行结果与初期的分析结果一致。 5. rabbitMQ-Fanout交换机 这种就是广播模式,即所有的绑定到指定的exchange上的queue都可以接收消息。 MQConfig:

  MQSender:

  MQReceiver:

  运行结果:

  queue1和queue2都接受到了消息。 6. rabbitMQ-Headers交换机 MQConfig:

  MQSender:

  MQReceiver:

  7. 秒杀优化 思路:减少数据库访问 系统初始化,把商品库存数量加载到redis 收到请求,redis预减库存,库存不够,直接返回,否则进入3 请求入队,立即返回排队中 请求出队,生成订单,减少库存 客户端轮询,是否秒杀成功 对于之前的秒杀接口do_miaosha:

  这里判断库存是直接从数据库查,因为并发量比较大,存在性能问题。后面秒杀到之后,也不是直接减库存, 而是将其放到消息队列中慢慢交给数据库去调整。

  在消息队列中对消息进行消化:

  对于controller中的优化1:redis预减库存。那么需要在系统启动的时候将秒杀商品的库存先添加到redis中:

  重写afterPropertiesSet()方法:

  对于前端,这时也要进行修改了,因为点击秒杀商品按键后,这里考虑三种情况:排队等待、失败、成功。那么这里规定-1为失败,0为排队,1为秒杀成功已经写入数据库。 原来的detail.htm中秒杀事件函数:

  秒杀到商品就直接返回,现在后端改为消息队列,所以需要增加函数进行判断,必要时需要轮询:

  所以将其改为:

  那么相应地,后台也要增加一个方法:result

  那么如何标记状态呢?这就是getMiaoshaResult方法所做的事情。 对于成功的状态判断,很简单,从数据库查,能查到就说明已经秒杀成功,否则就是两种情况:失败或者正在等待生成订单。 对于这两种状态,我们需要用redis来实现,思路是:在系统初始化的时候,redis中设置秒杀商品是否卖完的状态为false—即未卖完;

  在MiaoshaService中的Miaosha方法:数据库减库存失败的话,说明数据库的库存已经小于0了,那么这个时候,立即将redis初始设置的秒杀商品是否卖完的状态为true,表示商品已经全部卖完,返回秒杀失败。否则就是要前端等待等待。

  对于两个小方法getGoodsOver和setGoodsOver:

  那么redis预减库存,然后消息队列来进行创建订单就实现了。 当然,对于redis预减库存这一点,还有要优化的地方,就是现在的do_miaosha接口是这样的:

  但是,当秒杀商品已经没得时候,就没有必要再去redis中进行判断了,毕竟查询redis也是需要网络开销的,解决思路是:在内存中进行判断,如果redisService.decr得到的stock少于零的时候,直接将内存中的一个标志改变一下,那么下次再进入do_miaosha接口,先判断内存这个标记,如果库存已经小于0了,就不再访问redis,而是直接返回秒杀商品已经卖完。

  声明一个map:

  那么在afterPropertiesSet这个系统加载的初始化方法中对这个map进行初始化,goodsId–stock:

  在原来的redis预减库存初,发现库存小于0 ,就改为true:

  最后do_miaosha接口变为:

  ok,整个关于redis预减库存和rabbitMQ创建订单这个优化已经基本完成了。

.Net高并发解决思路

首先在windows上安装好Redis,RabbitMQ Redis-cli使用示例 ModelContext.cs代码:

在 Package Manager Console 下运行命令 Enable-Migrations 这个命令将在项目下创建文件夹 Migrations The Configuration class 这个类允许你去配置如何迁移,对于本文将使用默认的配置(在本文中因为只有一个 Context, Enable-Migrations 将自动对 context type 作出适配); An InitialCreate migration (本文为201702220232375_20170222.cs)这个迁移之所以存在是因为我们之前用 Code First 创建了数据库, 在启用迁移前,scaffolded migration 里的代码表示在数据库中已经创建的对象,本文中即为表 Person(列 Id 和 Name). 文件名包含一个 timestamp 以便排序(如果之前数据库没有被创建,那么 InitialCreate migration 将不会被创建,相反,当我们第一次调用 Add-Migration 的时候所有表都将归集到一个新的 migration 中) 多个实体锁定同一数据库 当使用 EF6 之前的版本时,只会有一个 Code First Model 被用来生成/管理数据库的 Schema, 这将导致每个数据库只会有一张 __MigrationsHistory 表,从而无法辨别实体与模型的对应关系。 从 EF6 开始,Configuration 类将会包含一个 ContextKey 属性,它将作为每一个 Code First Model 的唯一标识符, __MigrationsHistory 表中一个相应地的列允许来自多个模型(multiple models)的实体共享表(entries),默认情况下这个属性被设置成 context 的完全限定名。 定制化迁移 在 Package Manager Console 中运行命令 Add-Migration XXXXXXXXX 生成的迁移如下

  Configuration.cs代码:

  我们对迁移做些更改: 以下是本项目无关的其他示例:

  在 Package Manager Console 中运行命令 Update-Database –Verbose 消费者端,用来把消息队列里的数据写入数据库 MqHelper.cs代码:

  Program.cs代码:

  模拟并发的MVC网站,写入队列 HomeController.cs代码

  MqPublish.cs类

  RedisHelper.cs代码:

  Index.cshtml内容:

  Startup.cs代码:

  Web.config

  运行结果如图: 运行消费者端(控制台运用程序) from: https://blog.csdn.net/andrewniu/article/details/82856702

RabbitMQ的几种典型使用场景

AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。  7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。 8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。 9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。 在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层: 1.Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。 2.Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。 3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。 RabbitMQ使用场景 学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html 场景1:单发送单接收 使用场景:简单的发送与接收,没有特别的处理。 Producer:

Consumer:

场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。 Producer:

发送端和场景1不同点: 1、使用“task_queue”声明了另一个Queue,因为RabbitMQ不容许声明2个相同名称、配置不同的Queue 2、使"task_queue"的Queue的durable的属性为true,即使消息队列durable 3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable. Consumer:

接收端和场景1不同点: 1、使用“task_queue”声明消息队列,并使消息队列durable 2、在使用channel.basicConsume接收消息时使autoAck为false,即不自动会发ack,由channel.basicAck()在消息处理完成后发送消息。 3、使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端。 注意点: 1)It’s a common mistake to miss the basicAck. It’s an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won’t be able to release any unacked messages. 2)Note on message persistence Marking messages as persistent doesn’t fully guarantee that a message won’t be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet. Also, RabbitMQ doesn’t do fsync(2) for every message — it may be just saved to cache and not really written to the disk. The persistence guarantees aren’t strong, but it’s more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in atransaction. 3)Note about queue size If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy. 场景3:Publish/Subscribe 使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收。 Producer:

发送端: 发送消息到一个名为“logs”的exchange上,使用“fanout”方式发送,即广播消息,不需要使用queue,发送端不需要关心谁接收。 Consumer:

接收端: 1、声明名为“logs”的exchange的,方式为"fanout",和发送端一样。 2、channel.queueDeclare().getQueue();该语句得到一个随机名称的Queue,该queue的类型为non-durable、exclusive、auto-delete的,将该queue绑定到上面的exchange上接收消息。 3、注意binding queue的时候,channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略! 场景4:Routing (按路线发送接收) 使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息。 Producer:

发送端和场景3的区别: 1、exchange的type为direct 2、发送消息的时候加入了routing key Consumer:

接收端和场景3的区别: 在绑定queue和exchange的时候使用了routing key,即从该exchange上只接收routing key指定的消息。 场景5:Topics (按topic发送接收) 使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。 Producer:

发送端和场景4的区别: 1、exchange的type为topic 2、发送消息的routing key不是固定的单词,而是匹配字符串,如"*.lu.#",*匹配一个单词,#匹配0个或多个单词。 Consumer:

接收端和场景4的区别: 1、exchange的type为topic 2、接收消息的routing key不是固定的单词,而是匹配字符串。 注意点: Topic exchange Topic exchange is powerful and can behave like other exchanges. When a queue is bound with "#" (hash) binding key – it will receive all the messages, regardless of the routing key – like in fanout exchange. When special characters "*" (star) and "#" (hash) aren’t used in bindings, the topic exchange will behave just like a direct one.   from: https://blog.csdn.net/qq_15427331/article/details/62416486

高并发场景之RabbitMQ篇

上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案 但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题, 因为使用Redis要自己实现分布式锁   这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。 下面我们来用RabbitMQ来实现上一篇的场景   一、新建RabbitMQ.Receive

二、新建RabbitMQ.Send

  这里是模拟500个用户请求,正常的话最后Total就等于500 我们来说试试看,运行程序 2.1、打开接收端 2.2 运行客户端 2.3、可以看到2边几乎是实时的,再去看看数据库 三、我们在集群里执行   最后数据是1000   完全没有冲突,好了,就是这样 。   from: https://www.cnblogs.com/lanxiaoke/p/6659548.html

在CentOS7上安装RabbitMQ

安装过程参考官网: Installing on RPM-based Linux (RHEL, CentOS, Fedora, openSUSE) 首先需要安装erlang,参考:http://fedoraproject.org/wiki/EPEL/FAQ#howtouse

安装过程中会有提示,一路输入“y”即可。 完成后安装RabbitMQ: 先下载rpm:

下载完成后安装:

完成后启动服务:

可以查看服务状态:

这里可以看到log文件的位置,转到文件位置,打开文件: 这里显示的是没有找到配置文件,我们可以自己创建这个文件

编辑内容如下:

这里的意思是开放使用,rabbitmq默认创建的用户guest,密码也是guest,这个用户默认只能是本机访问,localhost或者127.0.0.1,从外部访问需要添加上面的配置。 保存配置后重启服务:

此时就可以从外部访问了,但此时再看log文件,发现内容还是原来的,还是显示没有找到配置文件,可以手动删除这个文件再重启服务,不过这不影响使用

  开放5672端口:

在Windows上进行测试: 新建.net core控制台项目,引用RabbitMQ.Client包:

测试代码:

也可以使用官网的例子(这里更清晰): http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html 发送端:

接收端:

在Windows上发送,在CentOS上接收,效果如图:   开启管理UI:

在Windows下打开地址:

用户名和密码都是 guest 这样就可以方便管理RabbitMQ了。   from:https://www.cnblogs.com/uptothesky/p/6094357.html

RabbitMQ用户增删及权限控制

用户角色分类 none:无法登录控制台 不能访问 management plugin,通常就是普通的生产者和消费者。 management:普通管理者。 仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对policies进行管理。用户可以通过AMQP做的任何事外加: 列出自己可以通过AMQP登入的virtual hosts 查看自己的virtual hosts中的queues, exchanges 和 bindings 查看和关闭自己的channels 和 connections 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。 policymaker:策略制定者。 management可以做的任何事外加: 查看、创建和删除自己的virtual hosts所属的policies和parameters monitoring:监控者。 management可以做的任何事外加: 列出所有virtual hosts,包括他们不能登录的virtual hosts 查看其他用户的connections和channels 查看节点级别的数据如clustering和memory使用情况 查看真正的关于所有virtual hosts的全局的统计信息 同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) administrator:超级管理员。 policymaker和monitoring可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections 创建用户

  例:创建一个超级用户

  查看用户列表

  为用户赋权

  删除用户

  修改用户的密码

   from: https://www.cnblogs.com/xinxiucan/p/7940953.html