原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://lanlian.blog.51cto.com/6790106/1303195 一、keepalived简介: keepalived是一个类似于layer3, 4 & 5交换机制的软件,也就是我们平时说的第3层、第4层和第5层交换。Keepalived的作用是检测web服务器的状态,如果有一台web服务器死机,或工作出现故障,Keepalived将检测到,并将有故障的web服务器从系统中剔除,当web服务器工作正常后Keepalived自动将web服务器加入到服务器群中,这些工作全部自动完成,不需要人工干涉,需要人工做的只是修复故障的web服务器。 工作原理 Layer3,4&5工作在IP/TCP协议栈的IP层,TCP层,及应用层,原理分别如下: Layer3:Keepalived使用Layer3的方式工作式时,Keepalived会定期向服务器群中的服务器发送一个ICMP的数据包(既我们平时用的Ping程序),如果发现某台服务的IP地址没有激活,Keepalived便报告这台服务器失效,并将它从服务器群中剔除,这种情况的典型例子是某台服务器被非法关机。Layer3的方式是以服务器的IP地址是否有效作为服务器工作正常与否的标准。 Layer4:如果您理解了Layer3的方式,Layer4就容易了。Layer4主要以TCP端口的状态来决定服务器工作正常与否。如web server的服务端口一般是80,如果Keepalived检测到80端口没有启动,则Keepalived将把这台服务器从服务器群中剔除。 Layer5:Layer5就是工作在具体的应用层了,比Layer3,Layer4要复杂一点,在网络上占用的带宽也要大一些。Keepalived将根据用户的设定检查服务器程序的运行是否正常,如果与用户的设定不相符,则Keepalived将把服务器从服务器群中剔除。 二、实验步骤: 1.创建管理节点在node1上,建立双机互信node1和node2,然后同步时间,安装keepalived 1 2 3 4 [root@node1~]# ansible all -m yum -a 'name=keepalived state=present' [root@node1keepalived]# rpm -qc keepalived /etc/keepalived/keepalived.conf//生成的主配置文件 /etc/sysconfig/keepalived 2.在node1上配置文件需要做一下修改 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 global_defs{ notification_email { root@localhost //收邮件人,可以定义多个 } notification_email_from kaadmin@localhost //发邮件人可以伪装 smtp_server 127.0.0.1 //发送邮件的服务器地址 smtp_connect_timeout 30 //连接超时时间 router_id LVS_DEVEL } vrrp_instanceVI_1 { //每一个vrrp_instance就是定义一个虚拟路由器的 state MASTER //由初始状态状态转换为master状态 interface eth0 virtual_router_id 51 //虚拟路由的id号,一般不能大于255的 priority 100 //初始化优先级 advert_int 1 //初始化通告 authentication { //认证机制 auth_type […]
View Details引入pom.xml
|
1 2 3 4 5 6 7 |
<!-- https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/ https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
application.yml 配置部分
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#rabbitmq rabbitmq: host: 10.0.0.2 port: 5672 username: springboot password: password publisher-confirms: true publisher-returns: true template: mandatory: true #https://github.com/spring-projects/spring-boot/blob/v2.0.5.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java listener: concurrency: 2 #最小消息监听线程数 max-concurrency: 2 #最大消息监听线程数 mybatis: mapper-locations: classpath:mapping/*.xml ··· |
创建配置文件
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; //错误的 import com.rabbitmq.client.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息的载体,每个消息都会被投到一个或多个队列。 * * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来. * * Routing Key:路由关键字, * * exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。 * * Producer:消息生产者,就是投递消息的程序. Consumer:消息消费者,就是接受消息的程序. * Channel:消息通道,在客户端的每个连接里,可建立多个channel. * * */ @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String QUEUE_A = "QUEUE_A"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * * 针对消费者配置 * * 1. 设置交换机类型 * * 2. 将队列绑定到交换机 * * FanoutExchange: * 将消息分发到所有的绑定队列,无routingkey的概念 * * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 * * 如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.AbstractExchange接口。 常用交换器类型如下: Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。 Topic(TopicExchange):按规则转发消息(最灵活)。 Headers(HeadersExchange):设置header attribute参数类型的交换机。 Fanout(FanoutExchange):转发消息到所有绑定队列。 * */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } /** * 获取队列A * * @return */ @Bean public Queue queueA() { return new Queue(QUEUE_A, true); // 队列持久 } /** * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。 * * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } } |
Producer
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
package com.redis; import java.util.UUID; import org.springframework.amqp.core.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; @Component public class MsgProducer implements RabbitTemplate.ConfirmCallback , ReturnCallback{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); // 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入 private RabbitTemplate rabbitTemplate; /** * 构造方法注入rabbitTemplate */ @Autowired public MsgProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); // rabbitTemplate如果为单例的话,那回调就是最后设置的内容 /** * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。 * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。 */ rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败"); System.out.println("消息主体 message : "+message); System.out.println("消息主体 message : "+replyCode); System.out.println("描述:"+replyText); System.out.println("消息使用的交换器 exchange : "+exchange); System.out.println("消息使用的路由键 routing : "+routingKey); } /** * rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送 rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。 * @param content */ public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); System.out.println("开始发送消息c : " + content.toLowerCase() + " ,correlationId= " + correlationId); String response = rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId).toString(); System.out.println("结束发送消息c : " + content.toLowerCase()); System.out.println("消费者响应c : " + response + " 消息处理完成"); //logger.info(" 发送消息TO A:" + content); // 把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId); } /** * 回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回调id:" + correlationData); if (ack) { logger.info("消息成功消费"); } else { logger.info("消息消费失败:" + cause); } } } |
Receiver
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MsgReceiver { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitHandler public void process(String content) { System.out.println("接收处理队列A当中的消息: " + content); } } |
unit test
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.redis.MsgProducer; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqTest { @Autowired private MsgProducer sender; @Test public void sendTest() throws Exception { //while(true){ String msg = new Date().toString(); sender.sendMsg(msg); Thread.sleep(6000); //} } } |
可以测试通过 image.png 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
package com.test.ratelimit; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.RateLimiter; public class ComplexDemo { private static RateLimiter rateLimiter = RateLimiter.create(10); private static AtomicInteger suc = new AtomicInteger(0), fail = new AtomicInteger(0); public static void main(String[] args) { // TODO Auto-generated method stub List<Runnable> tasks = new ArrayList<Runnable>(); for (int i = 0; i < 100; i++) { tasks.add(new UserRequest(i)); } ExecutorService threadPool = Executors.newCachedThreadPool(); for (Runnable runnable : tasks) { threadPool.execute(runnable); } } private static boolean startGo(int i) { //基于令牌桶算法的限流实现类 /** * 一秒出10个令牌,0.1秒出一个,100个请求进来,假如100个是同时到达, 那么最终只能成交10个,90个都会因为超时而失败。 * */ /** * tryAcquire(long timeout, TimeUnit unit) * 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话, * 或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) */ //判断能否在1秒内得到令牌,如果不能则立即返回false,不会阻塞程序 if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) { System.out.println("暂时无法获取令牌, 排队失败" + i); fail.getAndIncrement(); System.out.println("SUC/FAIL=" + suc.get() + "/" + fail.get()); return false; } if (update() > 0) { System.out.println("成功" + i); suc.getAndIncrement(); System.out.println("FAIL/SUC=" + fail.get() + "/" + suc.get()); return true; } System.out.println("数据不足,失败"); return false; } private static int update() { return 1; } private static class UserRequest implements Runnable { private int id; public UserRequest(int id) { this.id = id; } public void run() { startGo(id) ; } } } 测试结果: ... 成功8 FAIL/SUC=89/10 成功7 FAIL/SUC=89/11 |
总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。 REF: http://www.rabbitmq.com/install-rpm.html 作者:DONG999 链接:https://www.jianshu.com/p/f621b47c80c3 来源:简书 简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
View Details1. 安装RabbitMQ linux下的安装没什么可说的,因为本机懒得重装虚拟机了,所以就下载了windows版本进行安装。 erlang下载地址:http://www.erlang.org/download.html rabbitMQ下载:http://www.rabbitmq.com/download.html 直接下载安装即可。 因为想用可视化界面监控消息,所以先激活这个功能。
|
1 2 |
//到rabbitMQ安装目录的sbin目录下启动cmd黑窗口 E:\software\RabbitMQServer\rabbitmq_server-3.6.5\sbin>rabbitmq-plugins.bat enable rabbitmq_management |
然后重启rabbitMQ服务。输入网址:http://localhost:15672/。 使用默认用户guest/guest进入网页端控制台。 2. rabbitMQ基本原理和使用 rabbitMQ原理 Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 消息队列的使用过程大概如下 客户端连接到消息队列服务器,打开一个channel。 客户端声明一个exchange,并设置相关属性。 客户端声明一个queue,并设置相关属性。 客户端使用routing key,在exchange和queue之间建立好绑定关系。 客户端投递消息到exchange。 总结:exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 Direct交换机 完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。 Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。 Topic交换机 对key进行模式匹配后进行投递的叫做Topic交换机。*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。 在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。 我们创建3个绑定:Q1绑定键是“.orange.”,Q2绑定键是“..rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上, Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。 Fanout交换机 还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。 所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。 Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。 所以,Fanout Exchange 转发消息是最快的。 Headers交换机 首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。 持久化 RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分: – exchange持久化,在声明时指定durable => 1 – queue持久化,在声明时指定durable => 1 – 消息持久化,在投递时指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。 3. rabbitMQ-Direct交换机 […]
View Details首先在windows上安装好Redis,RabbitMQ Redis-cli使用示例 ModelContext.cs代码:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class ModelContext : DbContext { //您的上下文已配置为从您的应用程序的配置文件(App.config 或 Web.config) //连接字符串。 public ModelContext() : base("name=default") { } public virtual DbSet<Person> Person { get; set; } } public class Person { public int Id { get; set; } public string Id2 { get; set; } public string Name { get; set; } } |
在 Package Manager Console 下运行命令 Enable-Migrations 这个命令将在项目下创建文件夹 Migrations The Configuration class 这个类允许你去配置如何迁移,对于本文将使用默认的配置(在本文中因为只有一个 Context, Enable-Migrations 将自动对 context type 作出适配); An InitialCreate migration (本文为201702220232375_20170222.cs)这个迁移之所以存在是因为我们之前用 Code First 创建了数据库, 在启用迁移前,scaffolded migration 里的代码表示在数据库中已经创建的对象,本文中即为表 Person(列 Id 和 Name). 文件名包含一个 timestamp 以便排序(如果之前数据库没有被创建,那么 InitialCreate migration 将不会被创建,相反,当我们第一次调用 Add-Migration 的时候所有表都将归集到一个新的 migration 中) 多个实体锁定同一数据库 当使用 EF6 之前的版本时,只会有一个 Code First Model 被用来生成/管理数据库的 Schema, 这将导致每个数据库只会有一张 __MigrationsHistory 表,从而无法辨别实体与模型的对应关系。 从 EF6 开始,Configuration 类将会包含一个 ContextKey 属性,它将作为每一个 Code First Model 的唯一标识符, __MigrationsHistory 表中一个相应地的列允许来自多个模型(multiple models)的实体共享表(entries),默认情况下这个属性被设置成 context 的完全限定名。 定制化迁移 在 Package Manager Console 中运行命令 Add-Migration XXXXXXXXX 生成的迁移如下
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public partial class _20170222 : DbMigration { public override void Up() { CreateTable( "dbo.People", c => new { Id = c.Int(nullable: false, identity: true), Id2 = c.String(), Name = c.String(), }) .PrimaryKey(t => t.Id); } public override void Down() { DropTable("dbo.People"); } } |
Configuration.cs代码:
|
1 2 3 4 5 6 7 8 9 10 11 12 |
internal sealed class Configuration : DbMigrationsConfiguration<EF.ModelContext> { public Configuration() { AutomaticMigrationsEnabled = false; } protected override void Seed(EF.ModelContext context) { } } |
我们对迁移做些更改: […]
View DetailsAMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。 7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。 8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。 9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。 在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层: 1.Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。 2.Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。 3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。 RabbitMQ使用场景 学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html 场景1:单发送单接收 使用场景:简单的发送与接收,没有特别的处理。 Producer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } |
Consumer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } |
场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。 Producer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } |
发送端和场景1不同点: 1、使用“task_queue”声明了另一个Queue,因为RabbitMQ不容许声明2个相同名称、配置不同的Queue 2、使"task_queue"的Queue的durable的属性为true,即使消息队列durable 3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue […]
View Details上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案 但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题, 因为使用Redis要自己实现分布式锁 这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。 下面我们来用RabbitMQ来实现上一篇的场景 一、新建RabbitMQ.Receive
|
1 2 |
private static ConnectionFactory factory = new ConnectionFactory { HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" }; |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
1 static void Main(string[] args) 2 { 3 using (var connection = factory.CreateConnection()) 4 { 5 using (var channel = connection.CreateModel()) 6 { 7 var consumer = new EventingBasicConsumer(); 8 consumer.Received += (model, ea) => 9 { 10 var body = ea.Body; 11 var message = Encoding.UTF8.GetString(body); 12 Console.WriteLine(" [x] Received {0}", message); 13 14 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 15 var value = int.Parse(total) + 1; 16 17 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 18 }; 19 20 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 21 channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer); 22 23 Console.WriteLine(" Press [enter] to exit."); 24 Console.ReadLine(); 25 } 26 } 27 } |
二、新建RabbitMQ.Send
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
1 static void Main(string[] args) 2 { 3 for (int i = 1; i <= 500; i++) 4 { 5 Task.Run(async () => 6 { 7 await Produce(); 8 }); 9 10 Console.WriteLine(i); 11 } 12 13 Console.ReadKey(); 14 } 15 16 public static Task Produce() 17 { 18 return Task.Factory.StartNew(() => 19 { 20 using (var connection = factory.CreateConnection()) 21 { 22 using (var channel = connection.CreateModel()) 23 { 24 var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); 25 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 26 channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body); 27 } 28 } 29 }); 30 } |
这里是模拟500个用户请求,正常的话最后Total就等于500 我们来说试试看,运行程序 2.1、打开接收端 2.2 运行客户端 2.3、可以看到2边几乎是实时的,再去看看数据库 三、我们在集群里执行 最后数据是1000 完全没有冲突,好了,就是这样 。 from: https://www.cnblogs.com/lanxiaoke/p/6659548.html
View DetailsAdd the dotnet product feed Before installing .NET, you’ll need to register the Microsoft key, register the product repository, and install required dependencies. This only needs to be done once per machine. Open a terminal and run the following commands:
|
1 |
sudo rpm -Uvh https://packages.microsoft.com/config/rhel/7/packages-microsoft-prod.rpm |
Install the .NET Runtime Update the products available for installation, then install the .NET Runtime. In your terminal, run the following commands:
|
1 2 3 4 |
sudo yum update sudo yum install aspnetcore-runtime-2.2 也可以安装SDK yum install dotnet-sdk-2.2 |
The previous command will install the .NET Core Runtime Bundle, which includes the .NET Core runtime and the ASP.NET Core runtime. To […]
View Details安装过程参考官网: Installing on RPM-based Linux (RHEL, CentOS, Fedora, openSUSE) 首先需要安装erlang,参考:http://fedoraproject.org/wiki/EPEL/FAQ#howtouse
|
1 2 |
rpm -Uvh https://mirrors.tuna.tsinghua.edu.cn/epel/epel-release-latest-7.noarch.rpm yum install erlang |
安装过程中会有提示,一路输入“y”即可。 完成后安装RabbitMQ: 先下载rpm:
|
1 |
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm |
下载完成后安装:
|
1 |
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm |
完成后启动服务:
|
1 |
service rabbitmq-server start |
可以查看服务状态:
|
1 |
service rabbitmq-server status |
这里可以看到log文件的位置,转到文件位置,打开文件: 这里显示的是没有找到配置文件,我们可以自己创建这个文件
|
1 2 |
cd /etc/rabbitmq/ vi rabbitmq.config |
编辑内容如下:
|
1 |
[{rabbit, [{loopback_users, []}]}]. |
这里的意思是开放使用,rabbitmq默认创建的用户guest,密码也是guest,这个用户默认只能是本机访问,localhost或者127.0.0.1,从外部访问需要添加上面的配置。 保存配置后重启服务:
|
1 2 |
service rabbitmq-server stop service rabbitmq-server start |
此时就可以从外部访问了,但此时再看log文件,发现内容还是原来的,还是显示没有找到配置文件,可以手动删除这个文件再重启服务,不过这不影响使用
|
1 2 3 |
rm rabbit\@mythsky.log service rabbitmq-server stop service rabbitmq-server start |
开放5672端口:
|
1 2 |
firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --reload |
在Windows上进行测试: 新建.net core控制台项目,引用RabbitMQ.Client包:
|
1 |
Install-Package RabbitMQ.Client |
测试代码:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
public static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "guest"; factory.Password = "guest"; factory.VirtualHost = "/"; factory.HostName = "localhost"; //factory.HostName = "10.255.19.111"; try { IConnection conn = factory.CreateConnection(); IModel model = conn.CreateModel(); string exchangeName = "test"; string queueName = "testq"; string routingKey = "first"; model.ExchangeDeclare(exchangeName, ExchangeType.Direct); model.QueueDeclare(queueName, false, false, false, null); model.QueueBind(queueName, exchangeName, routingKey, null); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes); Console.WriteLine("message sended."); bool noAck = false; BasicGetResult result = model.BasicGet(queueName, noAck); if (result == null) { Console.Write("no message."); } else { IBasicProperties props = result.BasicProperties; byte[] body = result.Body; model.BasicAck(result.DeliveryTag, false); string message = System.Text.Encoding.UTF8.GetString(body); Console.Write(message); } } catch (Exception ex) { Console.Write(ex.Message); } } |
也可以使用官网的例子(这里更清晰): http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html 发送端:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using System; using System.Text; using RabbitMQ.Client; namespace rabbitMQ { public class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "192.168.0.169", UserName = "user", Password = "pass" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare( queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!" + DateTime.Now; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } |
接收端:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
using System; using System.Collections.Generic; using System.Text; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace rabbitMQ { class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "192.168.0.169", UserName = "user", Password = "pass" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare( queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } |
在Windows上发送,在CentOS上接收,效果如图: 开启管理UI:
|
1 2 3 |
rabbitmq-plugins enable rabbitmq_management firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --reload |
在Windows下打开地址:
|
1 |
http://10.255.19.111:15672 |
用户名和密码都是 guest 这样就可以方便管理RabbitMQ了。 from:https://www.cnblogs.com/uptothesky/p/6094357.html
View Details用户角色分类 none:无法登录控制台 不能访问 management plugin,通常就是普通的生产者和消费者。 management:普通管理者。 仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对policies进行管理。用户可以通过AMQP做的任何事外加: 列出自己可以通过AMQP登入的virtual hosts 查看自己的virtual hosts中的queues, exchanges 和 bindings 查看和关闭自己的channels 和 connections 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。 policymaker:策略制定者。 management可以做的任何事外加: 查看、创建和删除自己的virtual hosts所属的policies和parameters monitoring:监控者。 management可以做的任何事外加: 列出所有virtual hosts,包括他们不能登录的virtual hosts 查看其他用户的connections和channels 查看节点级别的数据如clustering和memory使用情况 查看真正的关于所有virtual hosts的全局的统计信息 同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) administrator:超级管理员。 policymaker和monitoring可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections 创建用户
|
1 2 3 4 |
rabbitmqctl add_user {用户名} {密码} // 设置权限 rabbitmqctl set_user_tags {用户名} {权限} |
例:创建一个超级用户
|
1 2 |
rabbitmqctl add_user admin1 admin1 rabbitmqctl set_user_tags admin1 administrator |
查看用户列表
|
1 |
rabbitmqctl list_users |
为用户赋权
|
1 2 3 4 5 6 7 8 9 10 |
// 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' // 查看权限 rabbitmqctl list_user_permissions user1 rabbitmqctl list_permissions -p vhost1 // 清除权限 rabbitmqctl clear_permissions [-p VHostPath] User |
删除用户
|
1 |
rabbitmqctl delete_user Username |
修改用户的密码
|
1 |
rabbitmqctl change_password Username Newpassword |
from: https://www.cnblogs.com/xinxiucan/p/7940953.html
View Details安装脚本
|
1 2 3 4 5 6 7 8 9 10 |
yum install wget -y #Download RabbitMQ and Erlang wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-3.6.9-1.el6.noarch.rpm wget https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el6.x86_64.rpm #Install Erlang and RabbitMQ rpm -ivh erlang-19.0.4-1.el6.x86_64.rpm rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc yum install rabbitmq-server-3.6.9-1.el6.noarch.rpm -y |
发现错误
|
1 2 |
1. 安装依赖Requires: socat 2. 安装socat的时候提示需要tcp_wrappers |
解决方案
|
1 2 3 4 |
wget –no-cache http://www.convirture.com/repos/definitions/rhel/6.x/convirt.repo -O /etc/yum.repos.d/convirt.repo yum install socat -y 使用https://rpmfind.net/linux/rpm2html/search.php搜索tcp_wrappers,下载,rpm -ivh tcp_wrappers.version |
启动RabbitMQ
|
1 2 3 4 5 6 |
chkconfig rabbitmq-server on /sbin/service rabbitmq-server start # 修改密码 rabbitmqctl change_password guest welcome123 # 启动管理界面 rabbitmq-plugins enable rabbitmq_management |
使用过程错误小记 user ‘guest’ – User can only log in via localhost 登录响应如下:
|
1 |
{"error":"not_authorised","reason":"User can only log in via localhost"} |
我实在docker中创建rabbitmq的,然后将端口15672映射到宿主机器上的15672,所以也算是远程登录,而guest用户要求只能使用localhost登录,所以要想远程登录必须新建用户。
|
1 2 3 |
rabbitmqctl add_user {username} {password} # 例:创建admin用户 rabbitmqctl add_user admin 123456 |
新建用户无法登录
|
1 |
{"error":"not_authorised","reason":"Not management user"} |
需要授予用户角色
|
1 2 3 |
rabbitmqctl set_user_tags {username} {role} # 例 rabbitmqctl set_user_tags admin administrator |
用户授权
|
1 2 |
rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read} rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" |
关于RabbitMQ权限和角色管理 参考:http://blog.csdn.net/zyz511919766/article/details/42292655 from:https://www.dev-heaven.com/posts/1914.html
View Details