laravel 5.4 改变了默认的数据库字符集,现在utf8mb4包括存储emojis支持。如果你运行MySQL v5.7.7或者更高版本,则不需要做任何事情。 当你试着在一些MariaDB或者一些老版本的的MySQL上运行 migrations 命令时,你可能会碰到下面这个错误:
1 2 3 4 5 |
[Illuminate\Database\QueryException] SQLSTATE[42000]: Syntax error or access violation: 1071 Specified key was too long; max key length is 767 bytes (SQL: alter table users add unique users_email_unique(email)) [PDOException] SQLSTATE[42000]: Syntax error or access violation: 1071 Specified key was too long; max key length is 767 bytes |
我们可以在 AppServiceProvider.php 文件里的 boot 方法里设置一个默认值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
<?php namespace App\Providers; use Illuminate\Support\ServiceProvider; use Illuminate\Support\Facades\Schema; class AppServiceProvider extends ServiceProvider { /** * Bootstrap any application services. * * @return void */ public function boot() { Schema::defaultStringLength(191); } /** * Register any application services. * * @return void */ public function register() { // } } |
from:https://www.cnblogs.com/betx/p/6544090.html
View Details在版本回退里,你已经知道,每次提交,Git都把它们串成一条时间线,这条时间线就是一个分支。截止到目前,只有一条时间线,在Git里,这个分支叫主分支,即master分支。HEAD严格来说不是指向提交,而是指向master,master才是指向提交的,所以,HEAD指向的就是当前分支。 一开始的时候,master分支是一条线,Git用master指向最新的提交,再用HEAD指向master,就能确定当前分支,以及当前分支的提交点: 每次提交,master分支都会向前移动一步,这样,随着你不断提交,master分支的线也越来越长。 当我们创建新的分支,例如dev时,Git新建了一个指针叫dev,指向master相同的提交,再把HEAD指向dev,就表示当前分支在dev上: 你看,Git创建一个分支很快,因为除了增加一个dev指针,改改HEAD的指向,工作区的文件都没有任何变化! 不过,从现在开始,对工作区的修改和提交就是针对dev分支了,比如新提交一次后,dev指针往前移动一步,而master指针不变: 假如我们在dev上的工作完成了,就可以把dev合并到master上。Git怎么合并呢?最简单的方法,就是直接把master指向dev的当前提交,就完成了合并: 所以Git合并分支也很快!就改改指针,工作区内容也不变! 合并完分支后,甚至可以删除dev分支。删除dev分支就是把dev指针给删掉,删掉后,我们就剩下了一条master分支: 真是太神奇了,你看得出来有些提交是通过分支完成的吗? 下面开始实战。 首先,我们创建dev分支,然后切换到dev分支:
1 2 |
$ git checkout -b dev Switched to a new branch 'dev' |
git checkout命令加上-b参数表示创建并切换,相当于以下两条命令:
1 2 3 |
$ git branch dev $ git checkout dev Switched to branch 'dev' |
然后,用git branch命令查看当前分支:
1 2 3 |
$ git branch * dev master |
git branch命令会列出所有分支,当前分支前面会标一个*号。 然后,我们就可以在dev分支上正常提交,比如对readme.txt做个修改,加上一行:
1 |
Creating a new branch is quick. |
然后提交:
1 2 3 4 |
$ git add readme.txt $ git commit -m "branch test" [dev fec145a] branch test 1 file changed, 1 insertion(+) |
现在,dev分支的工作完成,我们就可以切换回master分支:
1 2 |
$ git checkout master Switched to branch 'master' |
切换回master分支后,再查看一个readme.txt文件,刚才添加的内容不见了!因为那个提交是在dev分支上,而master分支此刻的提交点并没有变: 现在,我们把dev分支的工作成果合并到master分支上:
1 2 3 4 5 |
$ git merge dev Updating d17efd8..fec145a Fast-forward readme.txt | 1 + 1 file changed, 1 insertion(+) |
git merge命令用于合并指定分支到当前分支。合并后,再查看readme.txt的内容,就可以看到,和dev分支的最新提交是完全一样的。 注意到上面的Fast-forward信息,Git告诉我们,这次合并是“快进模式”,也就是直接把master指向dev的当前提交,所以合并速度非常快。 当然,也不是每次合并都能Fast-forward,我们后面会讲其他方式的合并。 合并完成后,就可以放心地删除dev分支了:
1 2 |
$ git branch -d dev Deleted branch dev (was fec145a). |
删除后,查看branch,就只剩下master分支了:
1 2 |
$ git branch * master |
因为创建、合并和删除分支非常快,所以Git鼓励你使用分支完成某个任务,合并后再删掉分支,这和直接在master分支上工作效果是一样的,但过程更安全。 小结 Git鼓励大量使用分支: 查看分支:git branch 创建分支:git branch <name> 切换分支:git checkout <name> 创建+切换分支:git checkout -b <name> 合并某分支到当前分支:git merge <name> 删除分支:git branch -d <name> from:https://www.liaoxuefeng.com/wiki/0013739516305929606dd18361248578c67b8067c8c017b000/001375840038939c291467cc7c747b1810aab2fb8863508000
View Details$ git add . 添加所有文件 注意有个 . $ git commit -m '注释' 提交本地 $ git push origin master提交给默认分支 $ git -rm 删除 $ git pull origin master 从默认分支下载 $ git branch -v 查看所有分支 $ git log --pretty=oneline 查看版本日志 $ git reset --hard HEAD^/HEAD^^/HEAD~100/2287b773d655 恢复指定的版本 $ git reflog 显示所有执行过的命令行 ——————————————————————————————-- 查看分支:git branch 创建分支:git branch <name> 切换分支:git checkout <name> 创建+切换分支:git checkout -b <name> 合并某分支到当前分支:git merge <name> 删除分支:git branch -d <name>
View Details1.SVN建立分支 正确SVN服务器上会有两个目录:trunk和branches。trunk目录下面代码就是所谓的主版本,而branches文件夹主要是用来放置分支版本。分支版本是依赖于主版本的,因此建立分支版本时候,必须要在trunk文件夹操作才可以的。下面是推荐的操作步骤: (1)从SVN上checkout Trunk版本下来。如果在本地机器上已有了trunk版本,需要更新一下到最新代码。 (2)在本地抓下来的trunk版本的文件上,点击右键,弹出菜单,选中如下菜单项。 弹出如下对话框,按在下面的要求,填写好配置。 点击OK按钮,SVN就会建立好对应的分支版本。 2.从SVN抓取分支代码 (1)在本地机,建立新目录,从SVN分支版本路径上,抓取分支版本上的代码 (2)在F:\branchesh_v20160726中可以对代码进行正常地更新和提交操作。 2.1删除不想要的分支版本 (1)在本地机的SVN目录,右键,选择TortoiseSVN/Repo-broswer,进入版本库浏览(Repo-browser),在右侧树状窗口中的branches下,选择你要删除的分支版本,右键弹出菜单,点击“删除”即可。分支就能从代码库中删除。 3.从分支版本合并代码到trunk主版本中去 如果在分支版本新功能开发完成,需要将分支上的修改合并到主版本中,按下面步骤操作: (1)在合并代码之前,我们首先要更新分支代码,以防止别人提交了代码而你没有及时更新。造成别人的代码丢失。 (2)接下来我们要做的是将要有修改的代码提交。以保证其他人员的代码为最新状态。 (3)我们在分支代码合并到主干之前,先将主干代码更新一下。以保证别的人员合并之后的代码能及时更新到你本地。步骤同步骤(1)。 (4)主干代码更新完成之后,接下来我们打开分支代码的日志 (5)找到你要合并到主干的代码日志。然后右键打开菜单,选择Merge revision to ,然后选择你的主干目录确定即可 from:https://blog.csdn.net/luofeixiongsix/article/details/52052631
View Details现在,你已经学会了修改文件,然后把修改提交到Git版本库,现在,再练习一次,修改readme.txt文件如下:
1 2 |
Git is a distributed version control system. Git is free software distributed under the GPL. |
然后尝试提交:
1 2 3 4 |
$ git add readme.txt $ git commit -m "append GPL" [master 3628164] append GPL 1 file changed, 1 insertion(+), 1 deletion(-) |
像这样,你不断对文件进行修改,然后不断提交修改到版本库里,就好比玩RPG游戏时,每通过一关就会自动把游戏状态存盘,如果某一关没过去,你还可以选择读取前一关的状态。有些时候,在打Boss之前,你会手动存盘,以便万一打Boss失败了,可以从最近的地方重新开始。Git也是一样,每当你觉得文件修改到一定程度的时候,就可以“保存一个快照”,这个快照在Git中被称为commit。一旦你把文件改乱了,或者误删了文件,还可以从最近的一个commit恢复,然后继续工作,而不是把几个月的工作成果全部丢失。 现在,我们回顾一下readme.txt文件一共有几个版本被提交到Git仓库里了: 版本1:wrote a readme file
1 2 |
Git is a version control system. Git is free software. |
版本2:add distributed
1 2 |
Git is a distributed version control system. Git is free software. |
版本3:append GPL
1 2 |
Git is a distributed version control system. Git is free software distributed under the GPL. |
当然了,在实际工作中,我们脑子里怎么可能记得一个几千行的文件每次都改了什么内容,不然要版本控制系统干什么。版本控制系统肯定有某个命令可以告诉我们历史记录,在Git中,我们用git log命令查看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
$ git log commit 3628164fb26d48395383f8f31179f24e0882e1e0 Author: Michael Liao <askxuefeng@gmail.com> Date: Tue Aug 20 15:11:49 2013 +0800 append GPL commit ea34578d5496d7dd233c827ed32a8cd576c5ee85 Author: Michael Liao <askxuefeng@gmail.com> Date: Tue Aug 20 14:53:12 2013 +0800 add distributed commit cb926e7ea50ad11b8f9e909c05226233bf755030 Author: Michael Liao <askxuefeng@gmail.com> Date: Mon Aug 19 17:51:55 2013 +0800 wrote a readme file |
git log命令显示从最近到最远的提交日志,我们可以看到3次提交,最近的一次是append GPL,上一次是add distributed,最早的一次是wrote a readme file。 如果嫌输出信息太多,看得眼花缭乱的,可以试试加上--pretty=oneline参数:
1 2 3 4 |
$ git log --pretty=oneline 3628164fb26d48395383f8f31179f24e0882e1e0 append GPL ea34578d5496d7dd233c827ed32a8cd576c5ee85 add distributed cb926e7ea50ad11b8f9e909c05226233bf755030 wrote a readme file |
需要友情提示的是,你看到的一大串类似3628164…882e1e0的是commit id(版本号),和SVN不一样,Git的commit id不是1,2,3……递增的数字,而是一个SHA1计算出来的一个非常大的数字,用十六进制表示,而且你看到的commit id和我的肯定不一样,以你自己的为准。为什么commit id需要用这么一大串数字表示呢?因为Git是分布式的版本控制系统,后面我们还要研究多人在同一个版本库里工作,如果大家都用1,2,3……作为版本号,那肯定就冲突了。 每提交一个新版本,实际上Git就会把它们自动串成一条时间线。如果使用可视化工具查看Git历史,就可以更清楚地看到提交历史的时间线: 好了,现在我们启动时光穿梭机,准备把readme.txt回退到上一个版本,也就是“add distributed”的那个版本,怎么做呢? 首先,Git必须知道当前版本是哪个版本,在Git中,用HEAD表示当前版本,也就是最新的提交3628164…882e1e0(注意我的提交ID和你的肯定不一样),上一个版本就是HEAD^,上上一个版本就是HEAD^^,当然往上100个版本写100个^比较容易数不过来,所以写成HEAD~100。 现在,我们要把当前版本“append GPL”回退到上一个版本“add distributed”,就可以使用git reset命令:
1 2 |
$ git reset --hard HEAD^ HEAD is now at ea34578 add distributed |
--hard参数有啥意义?这个后面再讲,现在你先放心使用。 看看readme.txt的内容是不是版本add distributed:
1 2 3 |
$ cat readme.txt Git is a distributed version control system. Git is free software. |
果然。 还可以继续回退到上一个版本wrote a readme file,不过且慢,然我们用git log再看看现在版本库的状态:
1 2 3 4 5 6 7 8 9 10 11 12 |
$ git log commit ea34578d5496d7dd233c827ed32a8cd576c5ee85 Author: Michael Liao <askxuefeng@gmail.com> Date: Tue Aug 20 14:53:12 2013 +0800 add distributed commit cb926e7ea50ad11b8f9e909c05226233bf755030 Author: Michael Liao <askxuefeng@gmail.com> Date: Mon Aug 19 17:51:55 2013 +0800 wrote a readme file |
最新的那个版本append GPL已经看不到了!好比你从21世纪坐时光穿梭机来到了19世纪,想再回去已经回不去了,肿么办? 办法其实还是有的,只要上面的命令行窗口还没有被关掉,你就可以顺着往上找啊找啊,找到那个append GPL的commit id是3628164…,于是就可以指定回到未来的某个版本:
1 2 |
$ git reset --hard 3628164 HEAD is now at 3628164 append GPL |
版本号没必要写全,前几位就可以了,Git会自动去找。当然也不能只写前一两位,因为Git可能会找到多个版本号,就无法确定是哪一个了。 再小心翼翼地看看readme.txt的内容:
1 2 3 |
$ cat readme.txt Git is a distributed version control system. Git is free software distributed under the GPL. |
果然,我胡汉三又回来了。 Git的版本回退速度非常快,因为Git在内部有个指向当前版本的HEAD指针,当你回退版本的时候,Git仅仅是把HEAD从指向append GPL: 改为指向add distributed: 然后顺便把工作区的文件更新了。所以你让HEAD指向哪个版本号,你就把当前版本定位在哪。 现在,你回退到了某个版本,关掉了电脑,第二天早上就后悔了,想恢复到新版本怎么办?找不到新版本的commit id怎么办? 在Git中,总是有后悔药可以吃的。当你用$ git reset --hard HEAD^回退到add […]
View Details简述 适用特性 使用Dapper流程 代码示例 简述 Dapper是一个轻量级的ORM工具:ORM框架的核心思想是对象关系映射,ORM是将表与表之间的操作,映射成对象和对象之间的操作,就是通过操作实体类来达到操作表的目的。从数据库提取的数据会自动按你设置的映射要求封装成特定的对象。之后你就可以通过对对象进行操作来修改数据库中的数据。这时候你面对的不是信息的碎片,而是一个形象鲜明的对象。 ORM 框架很多: Dapper、 Mybatis.Net、EntityFramework 和 NHibernate。如果你在小的项目中,使用Entity Framework、NHibernate 来处理大数据访问及关系映射,未免有点杀鸡用牛刀。而Mybatis.Net需要配置XML文件,综合考虑你会觉得觉得ORM省时省力。 适用特性 1、Dapper是一个轻型的ORM类。代码就一个SqlMapper.cs文件,编译后就70K。 2、Dapper很快。Dapper的速度接近与IDataReader,取列表的数据超过了DataTable。 3、Dapper支持多数据库。Dapper支持Mysql,SqlLite,SqlServer,Oracle等一系列的数据库。 4、Dapper支持多表并联的对象。支持一对多 多对多的关系。并且没侵入性,想用就用,不想用就不用。无XML无属性。代码以前怎么写现在还怎么写。 5、Dapper原理通过Emit反射IDataReader的序列队列,来快速的得到和产生对象。性能很高(性能和原生ado.net相近)。 6、Dapper支持net2.0,3.0,3.5,4.0。【如果想在Net2.0下使用,可以去网上找一下Net2.0下如何配置运行Net3.5即可。】 7、Dapper语法十分简单,快捷添加到项目,容易上手。并且无须迁就数据库的设计。 适用Dapper流程 流程一: 通过NuGet程序包进行Dapper安装引用: 这个引用的Dapper DLL文件是对前面说的SqlMapper.cs 源文件的封装。 流程二: 我们这里用Mysql数据库,那么用到的MySql.Data.dll在官网下载.net驱动包(已经附加在此教程demo中) mysql .net连接驱动下载地址:http://dev.mysql.com/downloads/connector/net/ 示例代码 1、定义一个Person类对应数据库的Person表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
CREATE TABLE person ( idint(11) NOT NULL AUTO_INCREMENT, usernamevarchar(255) NOT NULL, passwordvarchar(20) NOT NULL, ageint(11) DEFAULT NULL, registerDatedatetime DEFAULT NULL, addressvarchar(255) DEFAULT NULL, PRIMARY KEY (id) ); //Person类 publicclassPerson { publicint id { get; set; } publicstring username { get; set; } publicstring password { get; set; } publicint age { get; set; } publicDateTimeregisterDate { get; set; } publicstring address { set; get; } } |
2、定义连接数据库字符串
1 2 3 4 |
public static string ConnString = "server=localhost;port=3306;user id=root;password=123456;database=test"; //SQL Server public static string ConnString = "server=localhost;user id=sa;password=123456;database=Dormitory"; |
3、 新增、修改、删除的 Execute 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public static int Execute(this IDbConnection cnn, string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null); //新增 var conn = MySqlDBHelper.CreateMySqlConnection(); try { var result = conn.Execute("Insert into Person(username,password,age,registerDate,address) values (@username,@password,@age,@registerDate,@address)", new { username = txtUserName.Text, password = txtPassword.Text, age = numAge.Value, registerDate = dptRegisterDate.Value, address = txtAddress.Text }); if (result > 0) { MessageBox.Show("添加成功!"); LoadDBInfoToDgvFirst(); } } catch (Exception ex) { throw ex; } finally { if (conn.State == System.Data.ConnectionState.Open) { conn.Close(); } } |
批量新增 和新增一条语句是一样的,后面的param 是object 对象,单个模型数据和集合都可以支持。执行上面方法会插入多条记录,这样sql可以灵活的控制,参数不用像ADO.Net那样声明每个参数,最后还要把参数集合赋值给ADO的命令。可以看出这样简洁多了。 4、 查询的Query 方法
1 2 3 |
public static IEnumerable<T> Query<T>(this IDbConnection cnn, string sql, object param = null, IDbTransaction transaction = null, bool buffered = true, int? commandTimeout = null, CommandType? commandType = null); 实际运用: var result = conn.Query<Person>("select * from Person order by id desc limit @indx", new { indx = num }); |
5、存储过程调用
1 2 3 4 5 6 7 8 |
var param = new DynamicParameters(); param.Add("@idIn", numUpdateId.Value, DbType.Int32, ParameterDirection.Input); param.Add("@Inusername", txtUserName.Text, DbType.String, ParameterDirection.Input); param.Add("@res", 0, DbType.Int32, ParameterDirection.Output); var res2 = conn.Execute("prUpdatePersion", param, null, null, CommandType.StoredProcedure); int res = param.Get<Int32>("@res"); |
6、多个数据库插入数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
private void button4_Click(object sender, EventArgs e) { MySqlConnection mySqlConn = MySqlDBHelper.CreateMySqlConnection(); MySqlTransaction tranMySql = mySqlConn.BeginTransaction(); SqlConnection SqlServerConn = SqlServerDBHelper.CreateSqlServerConnection(); SqlTransaction tranSqlServer = SqlServerConn.BeginTransaction(); try { var persion = new Person { username = txtUserName.Text, password = txtPassword.Text, age = numAge.Value, registerDate = dptRegisterDate.Value, address = txtAddress.Text }; int result = mySqlConn.Execute("Insert into Person(username,password,age,registerDate,address) values (@username,@password,@age,@registerDate,@address)", persion, tranMySql); persion.username += "Sqlserver"; result = SqlServerConn.Execute("Insert into Person(username,password,age,registerDate,address) values (@username,@password,@age,@registerDate,@address)", persion, tranSqlServer); MessageBox.Show("添加成功!"); tranMySql.Commit(); tranSqlServer.Commit(); } catch (Exception ex) { tranMySql.Rollback(); tranSqlServer.Rollback(); throw ex; } finally { if (mySqlConn.State == System.Data.ConnectionState.Open) { mySqlConn.Close(); } if (SqlServerConn.State == System.Data.ConnectionState.Open) { SqlServerConn.Close(); } tranSqlServer.Dispose(); tranMySql.Dispose(); } tabControl.SelectedTab = tabPage2; //加载信息 LoadDBInfoToDoubleDgvFirst(); } |
通过上面的代码演示主要常用的是:封装后 Query 方法、Excute 方法。(其底层的封装的方法可以见源码:513 from:https://blog.csdn.net/laokang426/article/details/77885137
View Details项目右键 Manage NuGet Packages for Solution 搜索Dapper -> Install
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Data.SqlClient; using System.Linq; namespace Dapper.Repository { public class DapperDemo { public static string ConnectionString { get { string _connectionString = ConfigurationManager.ConnectionStrings["DefaultConnection"].ToString(); return _connectionString; } } public SqlConnection OpenConnection() { SqlConnection connection = new SqlConnection(ConnectionString); connection.Open(); return connection; } /// <summary> /// 添加 /// </summary> /// <returns></returns> public bool Add() { int row = 0; ED_Data model = new ED_Data(); model.TableName = "123"; model.DataKey = "123"; model.FieldName = "123"; model.Value = "123"; model.Reference = "123"; model.Branch = 1; model.InActive = false; model.Updated = DateTime.Now; string query = "INSERT INTO ED_Data(TableName,DataKey,FieldName,Value,Reference,Branch,InActive,Updated) VALUES (@TableName,@DataKey,@FieldName,@Value,@Reference,@Branch,@InActive,@Updated)"; using (IDbConnection conn = OpenConnection()) { row = conn.Execute(query, model); } if (row > 0) return true; else return false; } /// <summary> /// 修改 /// </summary> /// <returns></returns> public int Update() { int row = 0; ED_Data model = new ED_Data(); model.TableName = "123"; model.DataKey = "123"; model.Updated = DateTime.Now; using (IDbConnection conn = OpenConnection()) { const string query = "UPDATE ED_Data SET DataKey=@DataKey,Updated=@Updated WHERE TableName=@TableName"; row = conn.Execute(query, model); } return row; } /// <summary> /// 删除 /// </summary> /// <returns></returns> public int Delete() { int row = 0; ED_Data model = new ED_Data(); model.TableName = "123"; using (IDbConnection conn = OpenConnection()) { const string query = "DELETE FROM ED_Data WHERE TableName=@TableName"; row = conn.Execute(query, model); } return row; } /// <summary> /// 查询一条数据 /// </summary> /// <param name="columnCatId"></param> /// <returns>ED_Data</returns> public ED_Data GetModel(string TableName) { using (IDbConnection conn = OpenConnection()) { const string query = "SELECT * FROM ED_Data WHERE TableName = @TableName"; return conn.Query<ED_Data>(query, new { tableName = TableName }).SingleOrDefault<ED_Data>(); } } /// <summary> /// 查询list集合 /// </summary> /// <returns>List</returns> public List<ED_Data> GetED_DataList() { using (IDbConnection conn = OpenConnection()) { const string query = "SELECT * FROM ED_Data"; return conn.Query<ED_Data>(query, null).ToList(); } } /// <summary> /// 事务处理 /// 删除 /// </summary> /// <param name="cat"></param> /// <returns></returns> public int DeleteColumnCatAndColumn(ED_Data cat) { try { using (IDbConnection conn = OpenConnection()) { string delete1 = "DELETE FROM ED_Data WHERE TableName=@TableName"; string delete2 = "DELETE FROM ED_Data WHERE TableName=@TableName"; IDbTransaction transaction = conn.BeginTransaction(); int row = conn.Execute(delete1, new { TableName = cat.TableName }, transaction, null, null); row += conn.Execute(delete2, new { TableName = cat.TableName }, transaction, null, null); transaction.Commit(); return row; } } catch (Exception) { throw; } } /// <summary> /// 执行存储过程 /// </summary> public void ExecuteStoredProcedure() { try { DynamicParameters para = new DynamicParameters(); para.Add("@param1", 1); para.Add("@param2", 2); using (IDbConnection conn = OpenConnection()) { int row = conn.Execute("存储过程名称", para, null, null, CommandType.StoredProcedure); } } catch (Exception) { throw; } } /// <summary> /// 批量添加 /// </summary> public void InsertBatch() { try { string sqlStr = "INSERT INTO ED_Data(TableName,DataKey,FieldName,Value,Reference,Branch,InActive,Updated) VALUES (@TableName,@DataKey,@FieldName,@Value,@Reference,@Branch,@InActive,@Updated)"; using (IDbConnection conn = OpenConnection()) { conn.Execute(sqlStr, new[] { new { TableName = "user1", DataKey = "a", FieldName = "name", Value = "000001", Reference = "", Branch = "", InActive = "", Updated = DateTime.Now }, new { TableName = "user2", DataKey = "b", FieldName = "age", Value = "000002", Reference = "", Branch= "", InActive = "", Updated = DateTime.Now }, new { TableName = "user3", DataKey = "c", FieldName = "phone", Value = "000003", Reference= "", Branch= "", InActive= "", Updated= DateTime.Now }, }, null, null, null); } } catch (Exception) { throw; } } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
using System; using System.Collections.Generic; using System.Configuration; using System.Data; using System.Data.SqlClient; using System.Linq; namespace Dapper.Repository { public class ED_Data { public string TableName { get; set; } public string DataKey { get; set; } public string FieldName { get; set; } public string Value { get; set; } public string Reference { get; set; } public int Branch { get; set; } public bool InActive { get; set; } public DateTime Updated { get; set; } } } |
Base基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 |
using Dapper.CoreLibrary; using Dapper.Entity; using System; using System.Collections.Generic; using System.Data; using System.Linq; using System.Reflection; using System.Text; using System.Transactions; namespace Dapper.Repository { public abstract class RepositoryBase<T> : DbConnectionFactory, IRepositoryBase<T> where T : IEntityBase<T> { public RepositoryBase(IDbConnection db) : base(db) { } /// <summary> /// /// </summary> /// <param name="model"></param> /// <returns></returns> public virtual int Add(T model) { int result = 0; try { var ps = model.GetType().GetProperties(); List<string> @colms = new List<string>(); List<string> @params = new List<string>(); foreach (var p in ps) { if (!p.CustomAttributes.Any(x => x.AttributeType == typeof(PrimaryKeyAttribute)) && !p.CustomAttributes.Any(x => x.AttributeType == typeof(DBIgnoreAttribute))) { @colms.Add(string.Format("[{0}]", p.Name)); @params.Add(string.Format("@{0}", p.Name)); } } var sql = string.Format("INSERT INTO [{0}] ({1}) VALUES({2}); SELECT @@IDENTITY;", typeof(T).Name, string.Join(", ", @colms), string.Join(", ", @params)); result = _conn.ExecuteScalar<int>(sql, model); } catch (Exception ex) { throw; } return result; } /// <summary> /// /// </summary> /// <param name="listModel"></param> public virtual void Add(List<T> listModel) { try { using (var scope = new TransactionScope()) { listModel.ForEach(model => { var ps = model.GetType().GetProperties(); List<string> @colms = new List<string>(); List<string> @params = new List<string>(); foreach (var p in ps) { if (!p.CustomAttributes.Any(x => x.AttributeType == typeof(PrimaryKeyAttribute)) && !p.CustomAttributes.Any(x => x.AttributeType == typeof(DBIgnoreAttribute))) { @colms.Add(string.Format("[{0}]", p.Name)); @params.Add(string.Format("@{0}", p.Name)); } } var sql = string.Format("INSERT INTO [{0}] ({1}) VALUES({2}); SELECT @@IDENTITY;", typeof(T).Name, string.Join(", ", @colms), string.Join(", ", @params)); _conn.ExecuteScalar<int>(sql, model); }); scope.Complete(); } } catch (Exception ex) { } } /// <summary> /// /// </summary> /// <param name="model"></param> /// <returns></returns> public virtual int AddWithGuid(T model) { int result = 0; try { var ps = model.GetType().GetProperties(); List<string> @colms = new List<string>(); List<string> @params = new List<string>(); foreach (var p in ps) { if (!p.CustomAttributes.Any(x => x.AttributeType == typeof(DBIgnoreAttribute))) { @colms.Add(string.Format("[{0}]", p.Name)); @params.Add(string.Format("@{0}", p.Name)); } } var sql = string.Format("INSERT INTO [{0}] ({1}) VALUES({2});", typeof(T).Name, string.Join(", ", @colms), string.Join(", ", @params)); result = _conn.Execute(sql, model); } catch (Exception ex) { throw; } return result; } /// <summary> /// /// </summary> /// <param name="model"></param> public virtual void Update(T model) { PropertyInfo pkInfo = null; var ps = model.GetType().GetProperties(); List<string> @params = new List<string>(); foreach (var p in ps) { if (p.CustomAttributes.Any(x => x.AttributeType == typeof(DBIgnoreAttribute))) { continue; } if (p.CustomAttributes.Any(x => x.AttributeType == typeof(PrimaryKeyAttribute))) { pkInfo = p; } else { @params.Add(string.Format("[{0}]=@{0}", p.Name)); } } var sql = string.Format("UPDATE [{0}] SET {1} WHERE [{2}] = @{2}", typeof(T).Name, string.Join(", ", @params), pkInfo.Name); _conn.Execute(sql, model); } /// <summary> /// /// </summary> /// <param name="listModel"></param> public virtual void Update(List<T> listModel) { using (var scope = new TransactionScope()) { listModel.ForEach(model => { PropertyInfo pkInfo = null; var ps = model.GetType().GetProperties(); List<string> @params = new List<string>(); foreach (var p in ps) { if (p.CustomAttributes.Any(x => x.AttributeType == typeof(DBIgnoreAttribute))) { continue; } if (p.CustomAttributes.Any(x => x.AttributeType == typeof(PrimaryKeyAttribute))) { pkInfo = p; } else { @params.Add(string.Format("[{0}] = @{0}", p.Name)); } } var sql = string.Format("UPDATE [{0}] SET {1} WHERE [{2}] = @{2}", typeof(T).Name, string.Join(", ", @params), pkInfo.Name); _conn.Execute(sql, model); }); scope.Complete(); } } /// <summary> /// /// </summary> /// <param name="primaryValue">主键ID</param> /// <param name="tableName">表名</param> /// <returns></returns> public virtual T GetModel(string primaryValue, string tableName = "") { try { string primaryWhere = string.Empty; var ps = typeof(T).GetProperties(); if (string.IsNullOrEmpty(tableName)) { tableName = typeof(T).Name; } var primary = ps.Single(p => p.CustomAttributes.FirstOrDefault(c => c.AttributeType == typeof(PrimaryKeyAttribute)) != null); primaryWhere = (string.Format("[{0}] = @primarykey", primary.Name)); var sql = string.Format("SELECT * FROM [{0}] WHERE {1}", tableName, primaryWhere); return _conn.Query<T>(sql, new { primarykey = primaryValue }).FirstOrDefault(); } catch (Exception) { throw; } } /// <summary> /// /// </summary> /// <param name="strWhere">where条件</param> /// <param name="tableName">表名</param> /// <returns></returns> public virtual T GetModelQuery(string strWhere, string tableName = "") { try { var sql = string.Format("SELECT * FROM [{0}] WHERE {1}", tableName, strWhere); return _conn.Query<T>(sql, new { where = strWhere }).FirstOrDefault(); } catch (Exception) { throw; } } /// <summary> /// 根据主键删除 /// </summary> /// <param name="primaryValue"></param> public virtual void Delete(string primaryValue) { try { string primaryWhere = string.Empty; var ps = typeof(T).GetProperties(); var primary = ps.Single(p => p.CustomAttributes.FirstOrDefault(c => c.AttributeType == typeof(PrimaryKeyAttribute)) != null); var sql = string.Format("DELETE FROM [{0}] WHERE {1} = @primarykey", typeof(T).Name, primary.Name); _conn.Execute(sql, new { primarykey = primaryValue }); } catch (Exception) { throw; } } /// <summary> /// /// </summary> /// <param name="strWhere"></param> public void DeleteStrWhere(string strWhere) { try { var sql = string.Format("DELETE FROM [{0}] WHERE {1}", typeof(T).Name, strWhere); _conn.Execute(sql); } catch (Exception) { throw; } } /// <summary> /// /// </summary> /// <param name="strWhere"></param> /// <param name="tableName"></param> /// <returns></returns> public virtual List<T> GetList(string strWhere, string tableName = "") { try { if (string.IsNullOrEmpty(tableName)) tableName = typeof(T).Name; var sql = string.Format("SELECT * FROM [{0}] " + (strWhere == "" ? "" : " WHERE " + " {1} "), tableName, strWhere); return _conn.Query<T>(sql).ToList(); } catch (Exception) { throw; } } /// <summary> /// /// </summary> /// <param name="param"></param> /// <returns></returns> public virtual PagerListResult<List<T>> GetPageList(PagerRequestParam param) { PagerListResult<List<T>> result = null; List<T> list = new List<T>(); int pageTotal = 1; int recordTotal = 0; int startIndex = 1; int endIndex = param.PageSize; try { if (param.PageIndex - 1 > 0) { startIndex = (param.PageIndex - 1 <= 0 ? 1 : param.PageIndex - 1) * param.PageSize + 1; endIndex = param.PageIndex * param.PageSize; } if (string.IsNullOrEmpty(param.TableName)) param.TableName = typeof(T).Name; StringBuilder strSql = new StringBuilder(); strSql.Append("SELECT * FROM ( "); strSql.Append(" SELECT ROW_NUMBER() OVER ("); if (!string.IsNullOrEmpty(param.OrderBy)) { strSql.Append("ORDER BY T." + param.OrderBy); } else { strSql.Append("ORDER BY T.ID DESC"); } strSql.Append(")AS Row, T.* FROM " + param.TableName + " T "); if (!string.IsNullOrEmpty(param.StrWhere)) { strSql.Append(" WHERE " + param.StrWhere); } strSql.Append(" ) TT"); strSql.AppendFormat(" WHERE TT.Row BETWEEN {0} AND {1}", startIndex, endIndex); list = _conn.Query<T>(strSql.ToString(), param.StrWhere).ToList(); if (list.Count > 0) { recordTotal = this.GetRecordCount(param.StrWhere, param.TableName); pageTotal = PagerRequestParam.Tool.PageTotal(param); } result = new PagerListResult<List<T>>(list, pageTotal, recordTotal); } catch (Exception ex) { result = new PagerListResult<List<T>>(ex); } return result; } /// <summary> /// 事务处理 /// Demo /// </summary> /// <returns></returns> public int DeleteTransaction() { try { const string delete1 = "DELETE FROM ED_Data WHERE TableName=@TableName"; const string delete2 = "DELETE FROM ED_Data WHERE TableName=@TableName"; IDbTransaction transaction = _conn.BeginTransaction(); int row = _conn.Execute(delete1, new { TableName = "user" }, transaction, null, null); row += _conn.Execute(delete2, new { TableName = "customer" }, transaction, null, null); transaction.Commit(); return row; } catch (Exception) { throw; } } /// <summary> /// 获取记录数 /// </summary> /// <param name="strWhere">Where条件</param> /// <returns></returns> public virtual int GetRecordCount(string strWhere, string tableName = "") { int count = 0; try { if (string.IsNullOrEmpty(tableName)) tableName = typeof(T).Name; StringBuilder strSql = new StringBuilder(); strSql.Append("SELECT COUNT(1) FROM " + tableName); if (!string.IsNullOrEmpty(strWhere)) { strSql.Append(" WHERE " + strWhere); } count = _conn.ExecuteScalar<int>(strSql.ToString()); } catch (Exception) { throw; } return count; } } } |
from:https://blog.csdn.net/kingcruel/article/details/52848950
View DetailsDDD 领域驱动设计 …… 是一种设计思想 CQRS 命令查询职责分离 …… 一种架构,其实就是程序层面的读写分离 IoC 控制反转 …… 一种设计模式,主要用到了反射技术 DI 依赖注入 …… IoC实现的过程就是DI
View DetailsCQRS架构简介 前不久,看到博客园一位园友写了一篇文章,其中的观点是,要想高性能,需要尽量:避开网络开销(IO),避开海量数据,避开资源争夺。对于这3点,我觉得很有道理。所以也想谈一下,CQRS架构下是如何实现高性能的。 关于CQRS(Command Query Responsibility Segration)架构,大家应该不会陌生了。简单的说,就是一个系统,从架构上把它拆分为两部分:命令处理(写请求)+查询处理(读请求)。然后读写两边可以用不同的架构实现,以实现CQ两端(即Command Side,简称C端;Query Side,简称Q端)的分别优化。CQRS作为一个读写分离思想的架构,在数据存储方面,没有做过多的约束。所以,我觉得CQRS可以有不同层次的实现,比如: CQ两端数据库共享,CQ两端只是在上层代码上分离;这种做法,带来的好处是可以让我们的代码读写分离,更好维护,且没有CQ两端的数据一致性问题,因为是共享一个数据库的。我个人认为,这种架构很实用,既兼顾了数据的强一致性,又能让代码好维护。 CQ两端数据库和上层代码都分离,然后Q的数据由C端同步过来,一般是通过Domain Event进行同步。同步方式有两种,同步或异步,如果需要CQ两端的强一致性,则需要用同步;如果能接受CQ两端数据的最终一致性,则可以使用异步。采用这种方式的架构,个人觉得,C端应该采用Event Sourcing(简称ES)模式才有意义,否则就是自己给自己找麻烦。因为这样做你会发现会出现冗余数据,同样的数据,在C端的db中有,而在Q端的db中也有。和上面第一种做法相比,我想不到什么好处。而采用ES,则所有C端的最新数据全部用Domain Event表达即可;而要查询显示用的数据,则从Q端的ReadDB(关系型数据库)查询即可。 我觉得要实现高性能,可以谈的东西还有很多。下面我想重点说说我想到的一些设计思路: 避开资源争夺 秒杀活动的例子分析 我觉得这是很重要的一点。什么是资源争夺?我想就是多个线程同时修改同一个数据。就像阿里秒杀活动一样,秒杀开抢时,很多人同时抢一个商品,导致商品的库存会被并发更新减库存,这就是一个资源争夺的例子。一般如果资源竞争不激烈,那无所谓,不会影响性能;但是如果像秒杀这种场景,那db就会抗不住了。在秒杀这种场景下,大量线程需要同时更新同一条记录,进而导致MySQL内部大量线程堆积,对服务性能、稳定性造成很大伤害。那怎么办呢?我记得阿里的丁奇写过一个分享,思路就是当MySQL的服务端多个线程同时修改一条记录时,可以对这些修改请求进行排队,然后对于InnoDB引擎层,就是串行的。这样排队后,不管上层应用发过来多少并行的修改同一行的请求,对于MySQL Server端来说,内部总是会聪明的对同一行的修改请求都排队处理;这样就能确保不会有并发产生,从而不会导致线程浪费堆积,导致数据库性能下降。这个方案可以见下图所示: 如上图所示,当很多请求都要修改A记录时,MySQL Server内部会对这些请求进行排队,然后一个个将对A的修改请求提交到InnoDB引擎层。这样看似在排队,实际上会确保MySQL Server不会死掉,可以保证对外提供稳定的TPS。 但是,对于商品秒杀这个场景,还有优化的空间,就是Group Commit技术。Group Commit就是对多个请求合并为一次操作进行处理。秒杀时,大家都在购买这个商品,A买2件,B买3件,C买1件;其实我们可以把A,B,C的这三个请求合并为一次减库存操作,就是一次性减6件。这样,对于A,B,C的这三个请求,在InnoDB层我们只需要做一次减库存操作即可。假设我们Group Commit的每一批的size是50,那就是可以将50个减操作合并为一次减操作,然后提交到InnoDB。这样,将大大提高秒杀场景下,商品减库存的TPS。但是这个Group Commit的每批大小不是越大越好,而是要根据并发量以及服务器的实际情况做测试来得到一个最优的值。通过Group Commit技术,根据丁奇的PPT,商品减库存的TPS性能从原来的1.5W提高到了8.5W。 从上面这个例子,我们可以看到阿里是如何在实际场景中,通过优化MySQL Server来实现高并发的商品减库存的。但是,这个技术一般人还真的不会!因为没多少人有能力去优化MySQL的服务端,排队也不行,更别说Group Commit了。这个功能并不是MySQL Server自带的,而是需要自己实现的。但是,这个思路我想我们都可以借鉴。 CQRS如何实现避免资源竞争 那么对于CQRS架构,如何按照这个思路来设计呢?我想重点说一下我上面提到的第二种CQRS架构。对于C端,我们的目标是尽可能的在1s内处理更多的Command,也就是数据写请求。在经典DDD的四层架构中,我们会有一个模式叫工作单元模式,即Unit of Work(简称UoW)模式。通过该模式,我们能在应用层,一次性以事务的方式将当前请求所涉及的多个对象的修改提交到DB。微软的EF实体框架的DbContext就是一个UoW模式的实现。这种做法的好处是,一个请求对多个聚合根的修改,能做到强一致性,因为是事务的。但是这种做法,实际上,没有很好的遵守避开资源竞争的原则。试想,事务A要修改a1,a2,a3三个聚合根;事务B要修改a2,a3,a4;事务C要修改a3,a4,a5三个聚合根。那这样,我们很容易理解,这三个事务只能串行执行,因为它们要修改相同的资源。比如事务A和事务B都要修改a2,a3这两个聚合根,那同一时刻,只能由一个事务能被执行。同理,事务B和事务C也是一样。如果A,B,C这种事务执行的并发很高,那数据库就会出现严重的并发冲突,甚至死锁。那要如何避免这种资源竞争呢?我觉得我们可以采取三个措施: 让一个Command总是只修改一个聚合根 这个做法其实就是缩小事务的范围,确保一个事务一次只涉及一条记录的修改。也就是做到,只有单个聚合根的修改才是事务的,让聚合根成为数据强一致性的最小单位。这样我们就能最大化的实现并行修改。但是你会问,但是我一个请求就是会涉及多个聚合根的修改的,这种情况怎么办呢?在CQRS架构中,有一个东西叫Saga。Saga是一种基于事件驱动的思想来实现业务流程的技术,通过Saga,我们可以用最终一致性的方式最终实现对多个聚合根的修改。对于一次涉及多个聚合根修改的业务场景,一般总是可以设计为一个业务流程,也就是可以定义出要先做什么后做什么。比如以银行转账的场景为例子,如果是按照传统事务的做法,那可能是先开启一个事务,然后让A账号扣减余额,再让B账号加上余额,最后提交事务;如果A账号余额不足,则直接抛出异常,同理B账号如果加上余额也遇到异常,那也抛出异常即可,事务会保证原子性以及自动回滚。也就是说,数据一致性已经由DB帮我们做掉了。 但是,如果是Saga的设计,那就不是这样了。我们会把整个转账过程定义为一个业务流程。然后,流程中会包括多个参与该流程的聚合根以及一个用于协调聚合根交互的流程管理器(ProcessManager,无状态),流程管理器负责响应流程中的每个聚合根产生的领域事件,然后根据事件发送相应的Command,从而继续驱动其他的聚合根进行操作。 转账的例子,涉及到的聚合根有:两个银行账号聚合根,一个交易(Transaction)聚合根,它用于负责存储流程的当前状态,它还会维护流程状态变更时的规则约束;然后当然还有一个流程管理器。转账开始时,我们会先创建一个Transaction聚合根,然后它产生一个TransactionStarted的事件,然后流程管理器响应事件,然后发送一个Command让A账号聚合根做减余额的操作;A账号操作完成后,产生领域事件;然后流程管理器响应事件,然后发送一个Command通知Transaction聚合根确认A账号的操作;确认完成后也会产生事件,然后流程管理器再响应,然后发送一个Command通知B账号做加上余额的操作;后续的步骤就不详细讲了。大概意思我想已经表达了。总之,通过这样的设计,我们可以通过事件驱动的方式,来完成整个业务流程。如果流程中的任何一步出现了异常,那我们可以在流程中定义补偿机制实现回退操作。或者不回退也没关系,因为Transaction聚合根记录了流程的当前状态,这样我们可以很方便的后续排查有状态没有正常结束的转账交易。具体的设计和代码,有兴趣的可以去看一下ENode源代码中的银行转账的例子,里面有完整的实现。 对修改同一个聚合根的Command进行排队 和上面秒杀的设计一样,我们可以对要同时修改同一个聚合根的Command进行排队。只不过这里的排队不是在MySQL Server端,而是在我们自己程序里做这个排队。如果我们是单台服务器处理所有的Command,那排队很容易做。就是只要在内存中,当要处理某个Command时,判断当前Command要修改的聚合根是否前面已经有Command在处理,如果有,则排队;如果没有,则直接执行。然后当这个聚合根的前一个Command执行完后,我们就能处理该聚合根的下一个Command了;但是如果是集群的情况下呢,也就是你不止有一台服务器在处理Command,而是有十台,那要怎么办呢?因为同一时刻,完全有可能有两个不同的Command在修改同一个聚合根。这个问题也简单,就是我们可以对要修改聚合根的Command根据聚合根的ID进行路由,根据聚合根的ID的hashcode,然后和当前处理Command的服务器数目取模,就能确定当前Command要被路由到哪个服务器上处理了。这样我们能确保在服务器数目不变的情况下,针对同一个聚合根实例修改的所有Command都是被路由到同一台服务器处理。然后加上我们前面在单个服务器里面内部做的排队设计,就能最终保证,对同一个聚合根的修改,同一时刻只有一个线程在进行。 通过上面这两个设计,我们可以确保C端所有的Command,都不会出现并发冲突。但是也要付出代价,那就是要接受最终一致性。比如Saga的思想,就是在最终一致性的基础上而实现的一种设计。然后,基于以上两点的这种架构的设计,我觉得最关键的是要做到:1)分布式消息队列的可靠,不能丢消息,否则Saga流程就断了;2)消息队列要高性能,支持高吞吐量;这样才能在高并发时,实现整个系统的整体的高性能。我开发的EQueue就是为了这个目标而设计的一个分布式消息队列,有兴趣的朋友可以去了解下哦。 Command和Event的幂等处理 CQRS架构是基于消息驱动的,所以我们要尽量避免消息的重复消费。否则,可能会导致某个消息被重复消费而导致最终数据无法一致。对于CQRS架构,我觉得主要考虑三个环节的消息幂等处理。 Command的幂等处理 这一点,我想不难理解。比如转账的例子中,假如A账号扣减余额的命令被重复执行了,那会导致A账号扣了两次钱。那最后就数据无法一致了。所以,我们要保证Command不能被重复执行。那怎么保证呢?想想我们平时一些判断重复的操作怎么做的?一般有两个做法:1)db对某一列建唯一索引,这样可以严格保证某一列数据的值不会重复;2)通过程序保证,比如插入前先通过select查询判断是否存在,如果不存在,则insert,否则就认为重复;显然通过第二种设计,在并发的情况下,是不能保证绝对的唯一性的。然后CQRS架构,我认为我们可以通过持久化Command的方式,然后把CommandId作为主键,确保Command不会重复。那我们是否要每次执行Command前线判断该Command是否存在呢?不用。因为出现Command重复的概率很低,一般只有是在我们服务器机器数量变动时才会出现。比如增加了一台服务器后,会影响到Command的路由,从而最终会导致某个Command会被重复处理,关于这里的细节,我这里不想多展开了,呵呵。有问题到回复里讨论吧。这个问题,我们也可以最大程度上避免,比如我们可以在某一天系统最空的时候预先增加好服务器,这样可以把出现重复消费消息的情况降至最低。自然也就最大化的避免了Command的重复执行。所以,基于这个原因,我们没有必要在每次执行一个Command时先判断该Command是否已执行。而是只要在Command执行完之后,直接持久化该Command即可,然后因为db中以CommandId为主键,所以如果出现重复,会主键重复的异常。我们只要捕获该异常,然后就知道了该Command已经存在,这就说明该Command之前已经被处理过了,那我们只要忽略该Command即可(当然实际上不能直接忽略,这里我由于篇幅问题,我就不详细展开了,具体我们可以再讨论)。然后,如果持久化没有问题,说明该Command之前没有被执行过,那就OK了。这里,还有个问题也不能忽视,就是某个Command第一次执行完成了,也持久化成功了,但是它由于某种原因没有从消息队列中删除。所以,当它下次再被执行时,Command Handler里可能会报异常,所以,健壮的做法时,我们要捕获这个异常。当出现异常时,我们要检查该Command是否之前已执行过,如果有,就要认为当前Command执行正确,然后要把之前Command产生的事件拿出来做后续的处理。这个问题有点深入了,我暂时不细化了。有兴趣的可以找我私聊。 Event持久化的幂等处理 然后,因为我们的架构是基于ES的,所以,针对新增或修改聚合根的Command,总是会产生相应的领域事件(Domain Event)。我们接下来的要做的事情就是要先持久化事件,再分发这些事件给所有的外部事件订阅者。大家知道,聚合根有生命周期,在它的生命周期里,会经历各种事件,而事件的发生总有确定的时间顺序。所以,为了明确哪个事件先发生,哪个事件后发生,我们可以对每个事件设置一个版本号,即version。聚合根第一个产生的事件的version为1,第二个为2,以此类推。然后聚合根本身也有一个版本号,用于记录当前自己的版本是什么,它每次产生下一个事件时,也能根据自己的版本号推导出下一个要产生的事件的版本号是什么。比如聚合根当前的版本号为5,那下一个事件的版本号则为6。通过为每个事件设计一个版本号,我们就能很方便的实现聚合根产生事件时的并发控制了,因为一个聚合根不可能产生两个版本号一样的事件,如果出现这种情况,那说明一定是出现并发冲突了。也就是一定是出现了同一个聚合根同时被两个Command修改的情况了。所以,要实现事件持久化的幂等处理,也很好做了,就是db中的事件表,对聚合根ID+聚合根当前的version建唯一索引。这样就能在db层面,确保Event持久化的幂等处理。另外,对于事件的持久化,我们也可以像秒杀那样,实现Group Commit。就是Command产生的事件不用立马持久化,而是可以先积累到一定的量,比如50个,然后再一次性Group Commit所有的事件。然后事件持久化完成后,再修改每个聚合根的状态即可。如果Group Commit事件时遇到并发冲突(由于某个聚合根的事件的版本号有重复),则退回为单个一个个持久化事件即可。为什么可以放心的这样做?因为我们已经基本做到确保一个聚合根同一时刻只会被一个Command修改。这样就能基本保证,这些Group Commit的事件也不会出现版本号冲突的情况。所以,大家是否觉得,很多设计其实是一环套一环的。Group Commit何时出发?我觉得可以只要满足两个条件了就可以触发:1)某个定时的周期到了就可以触发,这个定时周期可以根据自己的业务场景进行配置,比如每隔50ms触发一次;2)要Commit的事件到达某个最大值,即每批可以持久化的事件个数的最大值,比如每50个事件为一批,这个BatchSize也需要根据实际业务场景和你的存储db的性能综合测试评估来得到一个最适合的值;何时可以使用Group Commit?我觉得只有是在并发非常高,当单个持久化事件遇到性能瓶颈时,才需要使用。否则反而会降低事件持久化的实时性,Group Commit提高的是高并发下单位时间内持久化的事件数。目的是为了降低应用和DB之间交互的次数,从而减少IO的次数。不知不觉就说到了最开始说的那3点性能优化中的,尽量减少IO了,呵呵。 Event消费时的幂等处理 CQRS架构图中,事件持久化完成后,接下来就是会把这些事件发布出去(发送到分布式消息队列),给消费者消费了,也就是给所有的Event Handler处理。这些Event Handler可能是更新Q端的ReadDB,也可能是发送邮件,也可能是调用外部系统的接口。作为框架,应该有职责尽量保证一个事件尽量不要被某个Event Handler重复消费,否则,就需要Event Handler自己保证了。这里的幂等处理,我能想到的办法就是用一张表,存储某个事件是否被某个Event Handler处理的信息。每次调用Event Handler之前,判断该Event Handler是否已处理过,如果没处理过,就处理,处理完后,插入一条记录到这个表。这个方法相信大家也都很容易想到。如果框架不做这个事情,那Event Handler内部就要自己做好幂等处理。这个思路就是select if not exist, then handle, and at last insert的过程。可以看到这个过程不像前面那两个过程那样很严谨,因为在并发的情况下,理论上还是会出现重复执行Event Handler的情况。或者即便不是并发时也可能会造成,那就是假如event handler执行成功了,但是last insert失败了,那框架还是会重试执行event handler。这里,你会很容易想到,为了做这个幂等支持,Event Handler的一次完整执行,需要增加不少时间,从而会最后导致Query Side的数据更新的延迟。不过CQRS架构的思想就是Q端的数据由C端通过事件同步过来,所以Q端的更新本身就是有一定的延迟的。这也是CQRS架构所说的要接收最终一致性的原因。 关于幂等处理的性能问题的思考 关于CommandStore的性能瓶颈分析 大家知道,整个CQRS架构中,Command,Event的产生以及处理是非常频繁的,数据量也是非常大的。那如何保证这几步幂等处理的高性能呢?对于Command的幂等处理,如果对性能要求不是很高,那我们可以简单使用关系型DB即可,比如Sql Server, MySQL都可以。要实现幂等处理,只需要把主键设计为CommandId即可。其他不需要额外的唯一索引。所以这里的性能瓶颈相当于是对单表做大量insert操作的最大TPS。一般MySQL数据库,SSD硬盘,要达到2W […]
View DetailsTPS (transaction per second)代表每秒执行的事务数量,可基于测试周期内完成的事务数量计算得出。例如,用户每分钟执行6个事务,TPS为6 / 60s = 0.10 TPS。同时我们会知道事务的响应时间(或节拍),以此例,60秒完成6个事务也同时代表每个事务的响应时间或节拍为10秒。 利特尔法则 (Little’s law): 该法则由麻省理工大学斯隆商学院(MIT Sloan School of Management)的教授John Little﹐于1961年所提出与证明。它是一个有关提前期与在制品关系的简单数学公式,这一法则为精益生产的改善方向指明了道路。 利特尔法则的公式描述为:Lead Time(产出时间)= 存货数量×生产节拍 或 TH(生产效率)= WIP(存货数量)/ CT(周期时间) P.S: 稍后我们会列出负载模型中利特尔法则的应用公式。 我们通过2个示例来看一下此法则是如何在生产环境中发生作用的。 例1:假定我们所开发的并发服务器,并发的访问速率是:1000客户/分钟,每个客户在该服务器上将花费平均0.5分钟,根据little’s law规则,在任何时刻,服务器将承担1000×0.5=500个客户量的业务处理。假定过了一段时间,由于客户群的增大,并发的访问速率提升为2000客户/分钟。在这样的情况下,我们该如何改进我们系统的性能? 根据little’s law规则,有两种方案: 第一:提高服务器并发处理的业务量,即提高到2000×0.5=1000。 或者 第二:减少服务器平均处理客户请求的时间,即减少到:2000×0.25=500。 例2:假设你排队参观某个风景点,该风景点固定的容纳人数是:60人。每个人在该风景点停留的平均时间是:3分钟。假设在你的前面还排有20个人,问:你估计你大概等多少时间才能进入该风景点。 答案:1小时(3×20=60),和该景点固定的容纳人数无关。 为了通过利特尔法则研究负载模型,我们就先要了解两个因子:响应时间(Response time)和节拍(Pacing)。实际上节拍会超越响应时间对TPS的影响。 示例1:节拍0秒,思考时间0秒 用户执行5个事务并且每个事务的响应时间是10秒,需要花费50秒完成5个事务,即5/50=0.1 TPS (这里TPS是由响应时间控制)。 示例2:速率15秒,思考时间0秒 用户执行5个事务且每个事务的响应时间是10秒,但实际由于节拍大于响应时间,所以它优于响应时间控制了事务发生的频率。完成5个事务需要5*15 = 75秒,产生5/75=0.06667 TPS。 在第二个示例中,平均响应时间小于节拍15秒,需要75秒完成5个迭代,产生了0.06667 TPS。 上面两个例子中我们假设思考时间为0秒。如果思考时间为2秒,总时间仍是75秒完成5个迭代,产生0.06667 TPS。 节拍为0秒,则 用户数 = TPS * ( 响应时间 + 思考时间 ) 节拍不为0秒且大于响应时间与思考时间的和,则 用户数 = TPS * (速率) 事实上TPS是事务在w.r.t时间的速率,所以也被称为吞吐量(throughput)。 所以利特尔法则在负载模型中解释为:系统内平均用户数 = 平均响应时间 * 吞吐量 N = ( R + Z ) * X N, 用户数 R, 平均响应时间(也可能是速率) Z, […]
View Details