AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。 7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。 8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。 9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。 在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层: 1.Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。 2.Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。 3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。 RabbitMQ使用场景 学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html 场景1:单发送单接收 使用场景:简单的发送与接收,没有特别的处理。 Producer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } |
Consumer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } |
场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。 Producer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } |
发送端和场景1不同点: 1、使用“task_queue”声明了另一个Queue,因为RabbitMQ不容许声明2个相同名称、配置不同的Queue 2、使"task_queue"的Queue的durable的属性为true,即使消息队列durable 3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue […]
View Details上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案 但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题, 因为使用Redis要自己实现分布式锁 这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。 下面我们来用RabbitMQ来实现上一篇的场景 一、新建RabbitMQ.Receive
|
1 2 |
private static ConnectionFactory factory = new ConnectionFactory { HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" }; |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
1 static void Main(string[] args) 2 { 3 using (var connection = factory.CreateConnection()) 4 { 5 using (var channel = connection.CreateModel()) 6 { 7 var consumer = new EventingBasicConsumer(); 8 consumer.Received += (model, ea) => 9 { 10 var body = ea.Body; 11 var message = Encoding.UTF8.GetString(body); 12 Console.WriteLine(" [x] Received {0}", message); 13 14 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 15 var value = int.Parse(total) + 1; 16 17 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 18 }; 19 20 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 21 channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer); 22 23 Console.WriteLine(" Press [enter] to exit."); 24 Console.ReadLine(); 25 } 26 } 27 } |
二、新建RabbitMQ.Send
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
1 static void Main(string[] args) 2 { 3 for (int i = 1; i <= 500; i++) 4 { 5 Task.Run(async () => 6 { 7 await Produce(); 8 }); 9 10 Console.WriteLine(i); 11 } 12 13 Console.ReadKey(); 14 } 15 16 public static Task Produce() 17 { 18 return Task.Factory.StartNew(() => 19 { 20 using (var connection = factory.CreateConnection()) 21 { 22 using (var channel = connection.CreateModel()) 23 { 24 var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); 25 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 26 channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body); 27 } 28 } 29 }); 30 } |
这里是模拟500个用户请求,正常的话最后Total就等于500 我们来说试试看,运行程序 2.1、打开接收端 2.2 运行客户端 2.3、可以看到2边几乎是实时的,再去看看数据库 三、我们在集群里执行 最后数据是1000 完全没有冲突,好了,就是这样 。 from: https://www.cnblogs.com/lanxiaoke/p/6659548.html
View DetailsAdd the dotnet product feed Before installing .NET, you’ll need to register the Microsoft key, register the product repository, and install required dependencies. This only needs to be done once per machine. Open a terminal and run the following commands:
|
1 |
sudo rpm -Uvh https://packages.microsoft.com/config/rhel/7/packages-microsoft-prod.rpm |
Install the .NET Runtime Update the products available for installation, then install the .NET Runtime. In your terminal, run the following commands:
|
1 2 3 4 |
sudo yum update sudo yum install aspnetcore-runtime-2.2 也可以安装SDK yum install dotnet-sdk-2.2 |
The previous command will install the .NET Core Runtime Bundle, which includes the .NET Core runtime and the ASP.NET Core runtime. To […]
View Details问题: 1、创建maven项目的时候,jdk版本是1.5版本,而自己安装的是1.7或者1.8版本。 2、每次右键项目名-maven->update project 时候,项目jdk版本变了,变回1.5版本或者其他版本 解决办法: 解决办法一:在项目中的pom.xml指定jdk版本,如下:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> |
这个方法只能保证该项目是jdk1.8版本,每次新建项目都得加上面代码,不推荐使用,推荐第二种方法。 解决方法二:在maven的安装目录找到settings.xml文件,在里面添加如下代码
|
1 2 3 4 5 6 7 8 9 10 11 12 |
<profile> <id>jdk-1.8</id> <activation> <activeByDefault>true</activeByDefault> <jdk>1.8</jdk> </activation> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> </properties> </profile> |
添加完后,在对eclipse进行设置。window->preferences->maven->user settings,user settings那里选择maven安装目录下的settings.xml文件。如下图 设置完成后,右键项目->maven->update project,这样每次新建maven项目都默认为jdk1.8版本了 解决方法三: 在解决方法二中,user settings的默认settigs.xml文件路径为:c:\users\Hxinguan\.m2\settings.xml,如下图。只要把设置好的settings.xml文件复制到该目录下,然后update project就好了。 from:https://www.cnblogs.com/Hxinguan/p/6132446.html
View Details错误描述 当创建有动态web模块3.0支持的项目时,需要用到Java版本不低于1.6。 在Markers标签页中显示的错误为:Dynamic Web Module 3.0 requires Java 1.6 or newer. 如图所示: 解决方法 注:有的时候1、2、3已经实现,直接跳过,操作4就OK了。 1、首先在Eclipse中安装JRE,Preferences > Java > Installed JREs,点击 Add,并添加自己的Java路径。 2、确认编译器版本不低于1.6,右键项目 > Properties > Java Compiler,保证“Compiler compliance level”不低于1.6。 3、保证项目的Facet中Java版本不低于1.6,右键项目 > Properties > MyEclipse > Project Facets > Java,保证“Java”不低于1.6。 4、在项目的pom.xml的标签中加入:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> |
5、最后一步,右键项目 > Maven > Update Project。 from:https://blog.csdn.net/liuxinghao/article/details/37088063
View Details需要设置的几处地方为: Window->Preferences->General->Workspace 面板Text file encoding 选择UTF-8 Window->Preferences->General ->Content Types->Text->JSP 最下面设置为UTF-8 Window->Preferences->Web->JSP Files 面板选择 ISO 10646/Unicode(UTF-8) 下面的图给出了具体的操作步骤: 第一步: 第二步: 当然还可以修改Java Source File、XML、Java Properties File等,在下面的Default encoding输入框中输入UTF-8,并点Update生效 如下图所示: 第三步: 通过上面的几个步骤,就可以完成设置编码,方便开发使用了。 from:https://blog.csdn.net/w_y_l_/article/details/82697503
View Details下载地址 https://www.microsoft.com/zh-CN/download/details.aspx?id=18970 快速描述 Microsoft Visual Studio International Feature Pack 2.0 Visual Studio International Feature Pack 2.0 包含一组控件和类库,设计用来帮助.NET开发人员创建国际化程序。 概述 Visual Studio International Feature Pack 2.0 是对 1.0 版本( 1.0 版的产品名是 Microsoft Visual Studio International Pack 1.0 SR1) 的扩展,包含一组控件和类库,设计用来帮助.NET开发人员创建国际化程序。 1, Yomigana Framework 包含了类库和控件。 a, 类库:Yomigana 类库容许对串(string)类型加注 Yomigana,同时也支持对一般类型的注解功能,任何实现了IEnumerable接口的对象都可以被串类型和泛型的实例注解。为了简化复杂的注解字符串比较特设计了支持各种日文比较选项的比较类型。 1) 通用的一些类,用泛型实现对一个可枚举的类型注音。 2) 特殊目的的一些类,用以上泛型实现对一个字符串用某种类型中注音。 3) 特殊目的的一些StringAnnotation 类,用以上泛型实现对一个字符串用字符串注音,包括解析和格式化功能。 4) 一个比较器类,使用以上类实现比较字符串。 5) 一个实现了 IEnumerable 的数据结构,把一个字符串分成枚举的字符串段,并用 IEnumerator 输出。 b, 控件: 1) 增强的Ajax/WPF/WinForm 文本框(TextBox)控件 用来根据用户的输入捕获读音。 2) 一个增强的使用Ruby标签的ASP.NET Label控件。 2, Chinese Text Alignment Class Library and TextBox Controls 包含支持简体中文文本对齐的WinForm 和 WPF 的TextBox控件, 以及供帮助开发人员很容易地按中文文本对齐显示字符串的一个类库。 3, Chinese Auto Complete Class Library and […]
View Details1.php.ini配置 eaccelerator.shm_only="0" zend_extension ="D:\xampp\php\ext\php_xdebug.dll" ;载入Xdebug [xdebug] ;开启远程调试 xdebug.remote_enable = 1 xdebug.profiler_enable = off xdebug.profiler_enable_trigger = off xdebug.profiler_output_name =cachegrind.out.%t.%p xdebug.profiler_output_dir ="D:\xampp\tmp" xdebug.show_local_vars=0 ;开启自动跟踪 xdebug.auto_trace = 1 ;开启异常跟踪 xdebug.show_exception_trace = 1 ;开启异常跟踪 xdebug.remote_autostart = 1 ;收集变量 xdebug.collect_vars = 1 ;收集参数 xdebug.collect_params = 1 ;trace输出路径 xdebug.trace_output_dir ="D:\xampp\tmp" ;以下三个分别是主机、端口、句柄 xdebug.remote_host="127.0.0.1" xdebug.remote_port=9000 xdebug.remote_handler="dbgp" 2.eclipse配置修改 2.1. 位置:windows->Preferences 2. 2 配置Executables(配置完成后,若设置断点能正常让XDebug运行,则无需继续设置其他选项) 2.3.配置 PHP Debug 将Server Settings下的Debugger设为XDebug 2.4.配置Debuggers 2.5配置workbench Options from:https://blog.csdn.net/user1218/article/details/51135753
View Details在eclipse环境下使用@Slf4j注解时,出现了log cannot be resolved这个异常。经过排查发现是缺少lombok插件的问题。解决方式当然是在eclipse中安装lombok插件啦。 前往官网下载:https://projectlombok.org/download
View Details调试的时候报这样的错误 解决办法: 工具--选项--调试--常规--启用asp.net的JavaScript调试(chrome和ie)去掉勾选 from:http://www.mamicode.com/info-detail-2349622.html
View Details