为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现这个分布式锁。
假设有3个用户对一个秒杀系统的商品点击购买并且该商品的数量只有1件,如果不设置分布式锁的情况,会出现3个人都可能出去购买成功的情况,这种情况是系统不允许的.
例如下面情况,当库存是100的时候,用jmeter模拟100个用户下单,会显示库存一直只减少了1件.
主要依赖redis 的setnx()、expire() 这2个函数实现
方法 | 描述 |
---|---|
setnx(lockkey, 1) | 如果方法返回 0,则说明占位失败;如果返回 1,则说明占位成功 |
expire() | 对 lockkey 设置超时时间,为的是避免死锁问题。 |
1 2 3 4 |
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> |
1 2 3 4 5 6 7 8 9 10 |
server: port: 8080 spring: redis: #redis服务器的ip host: 127.0.0.1 port: 6379 jedis: pool: max-active: 8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
@RestController public class OrderController { @Autowired StringRedisTemplate stringRedisTemplate; @RequestMapping("/deduct_stock/{productId}") public String deductStock(@PathVariable String productId) { String lockKey = "product_" + productId; try { //利用redis单线程模型去写值,写入成功即获取锁,设置30秒后失效,避免程序出现宕机情况 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock", 30, TimeUnit.SECONDS); if (!result) { //尝试再去获取3次锁,如果不需要尝试获取锁可以注释了下面这段,直接返回失败 result = deductStockCAS(lockKey, 3); if (!result) { return "error"; } } Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { stock -= 1; stringRedisTemplate.opsForValue().set("stock", stock.toString()); System.out.println("库存扣减成功,剩余库存:" + stock); return "success"; } System.out.println("库存不足,扣减失败!"); return "error"; } finally { //释放锁 stringRedisTemplate.delete(lockKey); } } /** * 设置要获取的key和尝试的次数 * 没有获取到锁,通过CAS自旋 */ public boolean deductStockCAS(String lockKey, Integer count) { try { int i = 0; do { Thread.sleep(1000L); i++; if (i == count + 1) {//自旋结束 return false; } } while (!stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock", 30, TimeUnit.SECONDS)); return true; } catch (Exception e) { return false; } } } |
启动2个进程
分别启动8080和8081这两个端口
让idea 一个项目能够启动多次
修改idea启动配置,勾选 Allow parallel run
按默认配置启动8080端口,然后修改启动配置,启动8081端口
在Program arguments处添加 以下配置,然后点击OK,再启动程序
1 |
--server.port=8081 |
配置nginx
1 2 3 4 5 6 7 8 9 10 11 12 13 |
http{ upstream etopgroup { server localhost:8080; server localhost:8081; } server { listen 80; server_name localhost; location / { proxy_pass http://tomcatServer; } } } |
使用java多线程模拟100个用户并行操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class SimulateTest { public static void main(String[] args) { //并发的线程数 int threadSize=100; ExecutorService fixedThreadPool=Executors.newFixedThreadPool(threadSize); for(int i=0;i<threadSize;i++) { fixedThreadPool.submit(() -> { RestTemplate restTemplate = new RestTemplate(); String result = restTemplate.getForObject("http://localhost/deduct_stock/1", String.class); System.out.println(result); }); } } |
8080端口扣减数量
8081端口扣减数量
可以看到2个进程的扣商品扣减数都是正常的。
也可以使用jmeter模拟并发,jmeter使用请参考我写的 jmeter入门教程
利用主键唯一的特性,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,当方法执行完毕之后,想要释放锁的话,删除这条数据库记录即可。
下图是商品库存是100件,用jmeter模拟100个用户请求,重数据库扣减库存出现的情况,为了避免这种情况出现,加上分布式锁解决该问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
<!--data jpa--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!--数据库--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--增强处理,用于在任务调度的方法切入获取锁请求--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
@RestController public class OrderController { @Autowired private CommodityRepository commodityRepository; @RequestMapping("/deduct_stock") public String deductStock() { Commodity commodity = commodityRepository.findByCommodityName("惠普暗夜精灵5"); if (commodity.getNumber() > 0) { commodity.setNumber(commodity.getNumber() - 1); commodityRepository.save(commodity); System.out.println("抢购成功,剩余库存:" + commodity.getNumber()); return "success"; } else { System.out.println("剩余库存不足,抢购失败!"); return "error"; } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
//存储锁对应的方法信息 @Entity @Table(name = "method_lock") public class MethodLock { @Id private String id; //被锁的方法名称 private String methodName; //占用的线程描述 private String methodDesc; //操作时间 private Date updateTime; //省略 get set方法 ... } //存储商品信息 @Entity @Table(name = "commodity") public class Commodity { @Id private Integer id; //商品数量 private Integer number; //商品名称 private String commodityName; //省略 get set方法 ... } |
1 2 |
#准备测速数据 insert into commodity (id,commodityName,number)value(1,'惠普暗夜精灵5',5); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
@Aspect @Configuration public class TaskPointcut { @Autowired private MethodLockRepository methodLockRepository; private final Logger logger = LoggerFactory.getLogger(TaskPointcut.class); @Value("${server.port}") private Integer port; /** * 需要加分布式锁的切入点 * 这里可以指定OrderController下面的所有方法 */ @Pointcut("execution(public * com.ljm.databaselook.controller.OrderController.*(..)))") public void methodLock() { } /** * 事前处理 * 获取锁 尝试获取锁 * * @return 成功获取锁, 继续执行操作, 获取锁失败则返回错误信息 */ @Around("methodLock()") public Object around(ProceedingJoinPoint pj) { String methodName = ""; logger.info("Try to acquire the lock"); try { MethodSignature signature = (MethodSignature) pj.getSignature(); methodName = signature.getMethod().getName(); MethodLock methodLock = new MethodLock(); methodLock.setMethodName(methodName); //方法描述拼接线程描述和当前端口,可以再拼一些别的参数,保证不同进程用不同的描述 methodLock.setMethodDesc(Thread.currentThread().getId() + "-"+port); methodLock.setUpdateTime(new Date()); //插入数据成功则代表获取锁成功 methodLockRepository.save(methodLock); logger.info("around getLook success taskName={}", methodName); return pj.proceed(); } catch (Throwable e) { logger.info("getLook fail error={}",e); return "getLook fail"; } } /** * 事后处理 * 释放锁信息 */ @After("methodLock()") public void doAfterAdvice(JoinPoint joinPoint) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); String methodName = signature.getMethod().getName(); //只能释放当前线程拿到的锁信息 MethodLock methodLock = methodLockRepository.findByMethodNameAndMethodDesc(methodName, Thread.currentThread().getId() +"-"+port); if (methodLock != null) { logger.info("freed lock method:{}", methodName); methodLockRepository.delete(methodLock); logger.info("doAfterAdvice unLook methodName={}", methodName); } } /** * 异常处理 释放锁 */ @AfterThrowing("methodLock()") public void afterThrowing(JoinPoint joinPoint) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); String methodName = signature.getMethod().getName(); MethodLock methodLock = methodLockRepository.findByMethodNameAndMethodDesc(methodName, Thread.currentThread().getId() +"-"+port); if (methodLock != null) { logger.error("freed lock method:{}", methodName); methodLockRepository.delete(methodLock); logger.info("afterThrowing unLook methodName={}", methodName); } } |
压测的地址用的是nginx代理过的,具体配置请参考上面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public class SimulateTest { //java原子包,利用了CAS算法保证了数据的原子性, static AtomicInteger successCount=new AtomicInteger(0); //volatile修饰的变量能保证修改操作的原子性,但是在 ++操作中设计到了2个指令操作 count=count+1,所以在++操作中volatile原子性可能会失效 //static volatile Integer successCount=0; public static void main(String[] args) throws Exception{ //总共的线程数 int threadSize=100; //每秒并发数 final int count=20; //同步执行器,必须等所有线程都完成任务,才能执行后面的代码 CountDownLatch downLatch=new CountDownLatch(threadSize); ExecutorService fixedThreadPool=Executors.newFixedThreadPool(threadSize); for(int i=0;i<threadSize;i++) { int finalI = i; fixedThreadPool.submit(() -> { //每秒执行20个请求,执行5秒 if(finalI%count==0){ try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } RestTemplate restTemplate = new RestTemplate(); String result = restTemplate.getForObject("http://localhost/deduct_stock", String.class); if("success".equals(result)){ successCount.incrementAndGet(); } downLatch.countDown(); }); } //等待所有任务完成 downLatch.await(); System.out.println("购买商品成功的次数:"+successCount.get()); fixedThreadPool.shutdown(); } } |
可以看到,在压测5秒,每秒20个请求的情况下,只有3个线程拿到了锁
每个客户端对某个方法加锁时,在 Zookeeper 上与该方法对应的指定节点的目录下,生成一个唯一的临时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个临时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<!--zookeeper api操作依赖--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> <!--添加zookeeper服务注册--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>${curator.version}</version> </dependency> |
1 2 3 4 5 6 |
server: port: 8080 zk: url: 192.168.0.105:2181 #zookeeper服务器ip serviceName: /service |
下面代码中,库存没有放到数据库或者redis中,用内存放着,
有兴趣的同学可以自己放到redis或数据库然后开多个节点测试.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
@RestController public class OrderController { @Autowired private CuratorFramework zkClient; String lookPath = "/look/test"; AtomicInteger atomicInteger=new AtomicInteger(5);//设置库存 /** * 只有等锁释放了,别的线程才能获取新的锁 * @return */ @RequestMapping("/deduct_stock") public String deduct_stock() { try { InterProcessMutex lock = new InterProcessMutex(zkClient, lookPath); //acquire设置等待时间,下面设置的尝试获取锁的时间,不设置参数默认无限等待 if (lock.acquire(10, TimeUnit.SECONDS)) { try { if(atomicInteger.get()>0) { atomicInteger.set(atomicInteger.get() - 1); System.out.println("购买成功,剩余库存为:" + atomicInteger.get()); return "success"; } System.out.println("库存不足:" + atomicInteger.get()); } finally { //释放锁 lock.release(); } } return "error"; } catch (Exception ex) { ex.printStackTrace(); return "error"; } } |
创作不易,要是觉得我写的对你有点帮助的话,麻烦在gitee上帮我点下 Star
【SpringBoot框架篇】其它文章如下,后续会继续更新。
from:https://blog.csdn.net/ming19951224/article/details/106205332