首先在windows上安装好Redis,RabbitMQ Redis-cli使用示例 ModelContext.cs代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class ModelContext : DbContext { //您的上下文已配置为从您的应用程序的配置文件(App.config 或 Web.config) //连接字符串。 public ModelContext() : base("name=default") { } public virtual DbSet<Person> Person { get; set; } } public class Person { public int Id { get; set; } public string Id2 { get; set; } public string Name { get; set; } } |
在 Package Manager Console 下运行命令 Enable-Migrations 这个命令将在项目下创建文件夹 Migrations The Configuration class 这个类允许你去配置如何迁移,对于本文将使用默认的配置(在本文中因为只有一个 Context, Enable-Migrations 将自动对 context type 作出适配); An InitialCreate migration (本文为201702220232375_20170222.cs)这个迁移之所以存在是因为我们之前用 Code First 创建了数据库, 在启用迁移前,scaffolded migration 里的代码表示在数据库中已经创建的对象,本文中即为表 Person(列 Id 和 Name). 文件名包含一个 timestamp 以便排序(如果之前数据库没有被创建,那么 InitialCreate migration 将不会被创建,相反,当我们第一次调用 Add-Migration 的时候所有表都将归集到一个新的 migration 中) 多个实体锁定同一数据库 当使用 EF6 之前的版本时,只会有一个 Code First Model 被用来生成/管理数据库的 Schema, 这将导致每个数据库只会有一张 __MigrationsHistory 表,从而无法辨别实体与模型的对应关系。 从 EF6 开始,Configuration 类将会包含一个 ContextKey 属性,它将作为每一个 Code First Model 的唯一标识符, __MigrationsHistory 表中一个相应地的列允许来自多个模型(multiple models)的实体共享表(entries),默认情况下这个属性被设置成 context 的完全限定名。 定制化迁移 在 Package Manager Console 中运行命令 Add-Migration XXXXXXXXX 生成的迁移如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public partial class _20170222 : DbMigration { public override void Up() { CreateTable( "dbo.People", c => new { Id = c.Int(nullable: false, identity: true), Id2 = c.String(), Name = c.String(), }) .PrimaryKey(t => t.Id); } public override void Down() { DropTable("dbo.People"); } } |
Configuration.cs代码:
1 2 3 4 5 6 7 8 9 10 11 12 |
internal sealed class Configuration : DbMigrationsConfiguration<EF.ModelContext> { public Configuration() { AutomaticMigrationsEnabled = false; } protected override void Seed(EF.ModelContext context) { } } |
我们对迁移做些更改: […]
View DetailsAMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现。它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。 4.Message Queue:消息队列,用于存储还未被消费者消费的消息。 5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。 6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。 7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。 8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。 9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。 在了解了AMQP模型以后,需要简单介绍一下AMQP的协议栈,AMQP协议本身包括三层: 1.Module Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑,例如,客户端可以通过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。 2.Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通信提供可靠性、同步机制和错误处理。 3.Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。 RabbitMQ使用场景 学习RabbitMQ的使用场景,来自官方教程:https://www.rabbitmq.com/getstarted.html 场景1:单发送单接收 使用场景:简单的发送与接收,没有特别的处理。 Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } } |
Consumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } } |
场景2:单发送多接收 使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。 Producer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } } |
发送端和场景1不同点: 1、使用“task_queue”声明了另一个Queue,因为RabbitMQ不容许声明2个相同名称、配置不同的Queue 2、使"task_queue"的Queue的durable的属性为true,即使消息队列durable 3、使用MessageProperties.PERSISTENT_TEXT_PLAIN使消息durable When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue […]
View Details上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案 但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题, 因为使用Redis要自己实现分布式锁 这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。 下面我们来用RabbitMQ来实现上一篇的场景 一、新建RabbitMQ.Receive
1 2 |
private static ConnectionFactory factory = new ConnectionFactory { HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" }; |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
1 static void Main(string[] args) 2 { 3 using (var connection = factory.CreateConnection()) 4 { 5 using (var channel = connection.CreateModel()) 6 { 7 var consumer = new EventingBasicConsumer(); 8 consumer.Received += (model, ea) => 9 { 10 var body = ea.Body; 11 var message = Encoding.UTF8.GetString(body); 12 Console.WriteLine(" [x] Received {0}", message); 13 14 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString(); 15 var value = int.Parse(total) + 1; 16 17 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null); 18 }; 19 20 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 21 channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer); 22 23 Console.WriteLine(" Press [enter] to exit."); 24 Console.ReadLine(); 25 } 26 } 27 } |
二、新建RabbitMQ.Send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
1 static void Main(string[] args) 2 { 3 for (int i = 1; i <= 500; i++) 4 { 5 Task.Run(async () => 6 { 7 await Produce(); 8 }); 9 10 Console.WriteLine(i); 11 } 12 13 Console.ReadKey(); 14 } 15 16 public static Task Produce() 17 { 18 return Task.Factory.StartNew(() => 19 { 20 using (var connection = factory.CreateConnection()) 21 { 22 using (var channel = connection.CreateModel()) 23 { 24 var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()); 25 channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null); 26 channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body); 27 } 28 } 29 }); 30 } |
这里是模拟500个用户请求,正常的话最后Total就等于500 我们来说试试看,运行程序 2.1、打开接收端 2.2 运行客户端 2.3、可以看到2边几乎是实时的,再去看看数据库 三、我们在集群里执行 最后数据是1000 完全没有冲突,好了,就是这样 。 from: https://www.cnblogs.com/lanxiaoke/p/6659548.html
View Details安装过程参考官网: Installing on RPM-based Linux (RHEL, CentOS, Fedora, openSUSE) 首先需要安装erlang,参考:http://fedoraproject.org/wiki/EPEL/FAQ#howtouse
1 2 |
rpm -Uvh https://mirrors.tuna.tsinghua.edu.cn/epel/epel-release-latest-7.noarch.rpm yum install erlang |
安装过程中会有提示,一路输入“y”即可。 完成后安装RabbitMQ: 先下载rpm:
1 |
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm |
下载完成后安装:
1 |
yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm |
完成后启动服务:
1 |
service rabbitmq-server start |
可以查看服务状态:
1 |
service rabbitmq-server status |
这里可以看到log文件的位置,转到文件位置,打开文件: 这里显示的是没有找到配置文件,我们可以自己创建这个文件
1 2 |
cd /etc/rabbitmq/ vi rabbitmq.config |
编辑内容如下:
1 |
[{rabbit, [{loopback_users, []}]}]. |
这里的意思是开放使用,rabbitmq默认创建的用户guest,密码也是guest,这个用户默认只能是本机访问,localhost或者127.0.0.1,从外部访问需要添加上面的配置。 保存配置后重启服务:
1 2 |
service rabbitmq-server stop service rabbitmq-server start |
此时就可以从外部访问了,但此时再看log文件,发现内容还是原来的,还是显示没有找到配置文件,可以手动删除这个文件再重启服务,不过这不影响使用
1 2 3 |
rm rabbit\@mythsky.log service rabbitmq-server stop service rabbitmq-server start |
开放5672端口:
1 2 |
firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --reload |
在Windows上进行测试: 新建.net core控制台项目,引用RabbitMQ.Client包:
1 |
Install-Package RabbitMQ.Client |
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
public static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "guest"; factory.Password = "guest"; factory.VirtualHost = "/"; factory.HostName = "localhost"; //factory.HostName = "10.255.19.111"; try { IConnection conn = factory.CreateConnection(); IModel model = conn.CreateModel(); string exchangeName = "test"; string queueName = "testq"; string routingKey = "first"; model.ExchangeDeclare(exchangeName, ExchangeType.Direct); model.QueueDeclare(queueName, false, false, false, null); model.QueueBind(queueName, exchangeName, routingKey, null); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes); Console.WriteLine("message sended."); bool noAck = false; BasicGetResult result = model.BasicGet(queueName, noAck); if (result == null) { Console.Write("no message."); } else { IBasicProperties props = result.BasicProperties; byte[] body = result.Body; model.BasicAck(result.DeliveryTag, false); string message = System.Text.Encoding.UTF8.GetString(body); Console.Write(message); } } catch (Exception ex) { Console.Write(ex.Message); } } |
也可以使用官网的例子(这里更清晰): http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html 发送端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
using System; using System.Text; using RabbitMQ.Client; namespace rabbitMQ { public class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "192.168.0.169", UserName = "user", Password = "pass" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare( queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!" + DateTime.Now; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } |
接收端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
using System; using System.Collections.Generic; using System.Text; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace rabbitMQ { class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "192.168.0.169", UserName = "user", Password = "pass" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare( queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } |
在Windows上发送,在CentOS上接收,效果如图: 开启管理UI:
1 2 3 |
rabbitmq-plugins enable rabbitmq_management firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --reload |
在Windows下打开地址:
1 |
http://10.255.19.111:15672 |
用户名和密码都是 guest 这样就可以方便管理RabbitMQ了。 from:https://www.cnblogs.com/uptothesky/p/6094357.html
View Details用户角色分类 none:无法登录控制台 不能访问 management plugin,通常就是普通的生产者和消费者。 management:普通管理者。 仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对policies进行管理。用户可以通过AMQP做的任何事外加: 列出自己可以通过AMQP登入的virtual hosts 查看自己的virtual hosts中的queues, exchanges 和 bindings 查看和关闭自己的channels 和 connections 查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。 policymaker:策略制定者。 management可以做的任何事外加: 查看、创建和删除自己的virtual hosts所属的policies和parameters monitoring:监控者。 management可以做的任何事外加: 列出所有virtual hosts,包括他们不能登录的virtual hosts 查看其他用户的connections和channels 查看节点级别的数据如clustering和memory使用情况 查看真正的关于所有virtual hosts的全局的统计信息 同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) administrator:超级管理员。 policymaker和monitoring可以做的任何事外加: 创建和删除virtual hosts 查看、创建和删除users 查看创建和删除permissions 关闭其他用户的connections 创建用户
1 2 3 4 |
rabbitmqctl add_user {用户名} {密码} // 设置权限 rabbitmqctl set_user_tags {用户名} {权限} |
例:创建一个超级用户
1 2 |
rabbitmqctl add_user admin1 admin1 rabbitmqctl set_user_tags admin1 administrator |
查看用户列表
1 |
rabbitmqctl list_users |
为用户赋权
1 2 3 4 5 6 7 8 9 10 |
// 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' // 查看权限 rabbitmqctl list_user_permissions user1 rabbitmqctl list_permissions -p vhost1 // 清除权限 rabbitmqctl clear_permissions [-p VHostPath] User |
删除用户
1 |
rabbitmqctl delete_user Username |
修改用户的密码
1 |
rabbitmqctl change_password Username Newpassword |
from: https://www.cnblogs.com/xinxiucan/p/7940953.html
View Details安装脚本
1 2 3 4 5 6 7 8 9 10 |
yum install wget -y #Download RabbitMQ and Erlang wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-3.6.9-1.el6.noarch.rpm wget https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el6.x86_64.rpm #Install Erlang and RabbitMQ rpm -ivh erlang-19.0.4-1.el6.x86_64.rpm rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc yum install rabbitmq-server-3.6.9-1.el6.noarch.rpm -y |
发现错误
1 2 |
1. 安装依赖Requires: socat 2. 安装socat的时候提示需要tcp_wrappers |
解决方案
1 2 3 4 |
wget –no-cache http://www.convirture.com/repos/definitions/rhel/6.x/convirt.repo -O /etc/yum.repos.d/convirt.repo yum install socat -y 使用https://rpmfind.net/linux/rpm2html/search.php搜索tcp_wrappers,下载,rpm -ivh tcp_wrappers.version |
启动RabbitMQ
1 2 3 4 5 6 |
chkconfig rabbitmq-server on /sbin/service rabbitmq-server start # 修改密码 rabbitmqctl change_password guest welcome123 # 启动管理界面 rabbitmq-plugins enable rabbitmq_management |
使用过程错误小记 user ‘guest’ – User can only log in via localhost 登录响应如下:
1 |
{"error":"not_authorised","reason":"User can only log in via localhost"} |
我实在docker中创建rabbitmq的,然后将端口15672映射到宿主机器上的15672,所以也算是远程登录,而guest用户要求只能使用localhost登录,所以要想远程登录必须新建用户。
1 2 3 |
rabbitmqctl add_user {username} {password} # 例:创建admin用户 rabbitmqctl add_user admin 123456 |
新建用户无法登录
1 |
{"error":"not_authorised","reason":"Not management user"} |
需要授予用户角色
1 2 3 |
rabbitmqctl set_user_tags {username} {role} # 例 rabbitmqctl set_user_tags admin administrator |
用户授权
1 2 |
rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read} rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" |
关于RabbitMQ权限和角色管理 参考:http://blog.csdn.net/zyz511919766/article/details/42292655 from:https://www.dev-heaven.com/posts/1914.html
View Details上一篇讲了如何再控制台将RocketMQ跑起来,本篇讲解,在asp.net mvc种跑起来,含(发布、订阅)。 因篇幅过长,本次将不挨个贴源码,直接展示目录,根据上一篇文章,进行相应的调整即可。 1、新建一个类库,将MQ公共部分提出来(源码在入门篇1中有讲解和截图): 如: 此时需要注意的有两点: (1)在RocketMQClientManager中,尽量将IRocketMQManager做成单例,避免每发一次消息,就实例化一个对象造成资源浪费。 (2)经测试,在RocketMQClientManager中发送消息时,遇到并发可能会存在消息丢失问题,防止消息丢失,可以在发布的时候,进行加锁处理。 2、在asp.net mvc 项目中配置MQ信息(需要在App_Data中修改RocketMQ的相应配置,入门篇1中有讲解)。 3、在asp.net mvc Global种添加初始化RocketMQ和订阅代码(按Tag进行订阅,也可订阅多有的Tag): 注意:FarseerBootstrapper.Create().Initialize(); //必须注册MQ服务,否则运行会抛出,未注册MQ服务啥。 4、接下来可以在控制器调用RocketMQ,进行测试(发布消息的Tag为BootsShuTestTag,消费可以根据指定的Tag进行消费): 发布、订阅运行结果: 注意、注意、注意: 1.需要将项目统一编译成X64、并且将项目寄宿到IIS,里边即可正常运行。在VS里边运行,是跑不起来的! 2.作者不推荐在服务端进行订阅,如果在IIS上运行,则可以将订阅模块提出来写成windows服务。 大家根据自己的业务场景自行甄别及其使用。 以上如有不对的,可以拍砖,相互学习,共同进步。 欢迎加入.NET CORE/ASP.NET CORE 技术交流群,我们期待你的加入。 群号:702566187 from:https://blog.csdn.net/weixin_37207795/article/details/81023914
View Details下面我将为大家介绍一款支持.NET端的RocketMQ。大名为:Farseer.Net.MQ.RocketMQ。 支持.Net Framework 4.5和.NetStandard 2.0 1、先创建一个.Net Framework的控制台,热热手。 2、通过Nuget安装相应组件。 Install-Package Farseer.Net.MQ.RocketMQ -Version 2.0.1 Install-Package Farseer.Net.Configuration 3、将ONSClient4CPP放到项目根目录: 如: 3.1、 3.2、 4、新建启动模块:PushModule: 如: 5、在我们创建的控制台中,编写RocketMQ代码: 如: 6、将项目改为X64。 7、修改App_Data下的Farseer.Net.josn(注意:程序运行,会自动生成改json配置文件) 8、运行控制台,能跑起来了吧! 欢迎加入.NET CORE/ASP.NET CORE 技术交流群,我们期待你的加入。 群号:702566187 from:https://blog.csdn.net/weixin_37207795/article/details/81023757
View Details开篇 在 OLTP 系统领域,我们在很多业务场景下都会面临事务一致性方面的需求,例如最经典的 Bob 给 Smith 转账的案例。传统的企业开发,系统往往是以单体应用形式存在的,也没有横跨多个数据库。我们通常只需借助开发平台中特有数据访问技术和框架(例如 Spring、JDBC、ADO.NET),结合关系型数据库自带的事务管理机制来实现事务性的需求。关系型数据库通常具有 ACID 特性:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)。 而大型互联网平台往往是由一系列分布式系统构成的,开发语言平台和技术栈也相对比较杂,尤其是在 SOA 和微服务架构盛行的今天,一个看起来简单的功能,内部可能需要调用多个“服务”并操作多个数据库或分片来实现,情况往往会复杂很多。单一的技术手段和解决方案,已经无法应对和满足这些复杂的场景了。 分布式系统的特性 对分布式系统有过研究的读者,可能听说过“CAP 定律”、“Base 理论”等,非常巧的是,化学理论中 ACID 是酸、Base 恰好是碱。这里笔者不对这些概念做过多的解释,有兴趣的读者可以查看相关参考资料。CAP 定律如下图: 在分布式系统中,同时满足“CAP 定律”中的“一致性”、“可用性”和“分区容错性”三者是不可能的,这比现实中找对象需同时满足“高、富、帅”或“白、富、美”更加困难。在互联网领域的绝大多数的场景,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。 分布式事务 提到分布式系统,必然要提到分布式事务。要想理解分布式事务,不得不先介绍一下两阶段提交协议。先举个简单但不精准的例子来说明: 第一阶段,张老师作为“协调者”,给小强和小明(参与者、节点)发微信,组织他们俩明天 8 点在学校门口集合,一起去爬山,然后开始等待小强和小明答复。 第二阶段,如果小强和小明都回答没问题,那么大家如约而至。如果小强或者小明其中一人回答说“明天没空,不行”,那么张老师会立即通知小强和小明“爬山活动取消”。 细心的读者会发现,这个过程中可能有很多问题的。如果小强没看手机,那么张老师会一直等着答复,小明可能在家里把爬山装备都准备好了却一直等着张老师确认信息。更严重的是,如果到明天 8 点小强还没有答复,那么就算“超时”了,那小明到底去还是不去集合爬山呢? 这就是两阶段提交协议的弊病,所以后来业界又引入了三阶段提交协议来解决该类问题。 两阶段提交协议在主流开发语言平台,数据库产品中都有广泛应用和实现的,下面来介绍一下 XOpen 组织提供的 DTP 模型图: XA 协议指的是 TM(事务管理器)和 RM(资源管理器)之间的接口。目前主流的关系型数据库产品都是实现了 XA 接口的。JTA(Java Transaction API) 是符合 X/Open DTP 模型的,事务管理器和资源管理器之间也使用了 XA 协议。 本质上也是借助两阶段提交协议来实现分布式事务的,下面分别来看看 XA 事务成功和失败的模型图: 在 JavaEE 平台下,WebLogic、Webshare 等主流商用的应用服务器提供了 JTA 的实现和支持。而在 Tomcat 下是没有实现的(其实笔者并不认为 Tomcat 能算是 JavaEE 应用服务器),这就需要借助第三方的框架Jotm、Automikos 等来实现,两者均支持 spring 事务整合。 而在 Windows .NET 平台中,则可以借助 ado.net 中的TransactionScop API 来编程实现,还必须配置和借助 Windows 操作系统中的 MSDTC 服务。如果你的数据库使用的 mysql,并且 mysql 是部署在 Linux 平台上的,那么是无法支持分布式事务的。 由于篇幅关系,这里不展开,感兴趣的读者可以自行查阅相关资料并实践。 […]
View Details1. 消息队列的历史 了解一件事情的来龙去脉,将不会对它感到神秘。让我们来看看消息队列(Message Queue)这项技术的发展历史。 Message Queue的需求由来已久,80年代最早在金融交易中,高盛等公司采用Teknekron公司的产品,当时的Message queuing软件叫做:the information bus(TIB)。 TIB被电信和通讯公司采用,路透社收购了Teknekron公司。之后,IBM开发了MQSeries,微软开发了Microsoft Message Queue(MSMQ)。这些商业MQ供应商的问题是厂商锁定,价格高昂。2001年,Java Message queuing试图解决锁定和交互性的问题,但对应用来说反而更加麻烦了。 于是2004年,摩根大通和iMatrix开始着手Advanced Message Queuing Protocol (AMQP)开放标准的开发。2006年,AMQP规范发布。2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ 1.0 发布。 目前RabbitMQ的最新版本为3.5.7,基于AMQP 0-9-1。 RabbitMQ采用Erlang语言开发。Erlang语言由Ericson设计,专门为开发concurrent和distribution系统的一种语言,在电信领域使用广泛。OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件/库/工具,如mnesia/SASL,极大方便了Erlang应用的开发。OTP就类似于Python语言中众多的module,用户借助这些module可以很方便的开发应用。 2. AMQP messaging 中的基本概念 Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。 Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。 Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。 Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。 Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。 Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。 Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。 3. 典型的“生产/消费”消息模型 生产者发送消息到broker server(RabbitMQ)。在Broker内部,用户创建Exchange/Queue,通过Binding规则将两者联系在一起。Exchange分发消息,根据类型/binding的不同分发策略有区别。消息最后来到Queue中,等待消费者取走。 4. Exchange类型 Exchange有多种类型,最常用的是Direct/Fanout/Topic三种类型。 Direct Message中的“routing key”如果和Binding中的“binding key”一致, Direct exchange则将message发到对应的queue中。 Fanout 每个发到Fanout类型Exchange的message都会分到所有绑定的queue上去。 Topic 根据routing key,及通配规则,Topic exchange将分发到目标queue中。 Routing key中可以包含两种通配符,类似于正则表达式:
1 2 |
“<span class="hljs-comment">#”通配任何零个或多个word “*”通配任何单个<span class="hljs-built_in">word</span></span> |
这里也推荐给想要了解RabbitMQ的同学一个网站,http://tryrabbitmq.com ,它提供在线RabbitMQ 模拟器,可以帮助理解Exchange/queue/binding概念。 至此,我们对于消息队列的发展,RabbitMQ的产生,以及AMQP协议中的重要概念做了一个完整的介绍。 from:https://www.cnblogs.com/frankyou/p/5283539.html
View Details