linux下的安装没什么可说的,因为本机懒得重装虚拟机了,所以就下载了windows版本进行安装。
erlang下载地址:http://www.erlang.org/download.html
rabbitMQ下载:http://www.rabbitmq.com/download.html
直接下载安装即可。
因为想用可视化界面监控消息,所以先激活这个功能。
1 2 |
//到rabbitMQ安装目录的sbin目录下启动cmd黑窗口 E:\software\RabbitMQServer\rabbitmq_server-3.6.5\sbin>rabbitmq-plugins.bat enable rabbitmq_management |
然后重启rabbitMQ服务。输入网址:http://localhost:15672/。 使用默认用户guest/guest进入网页端控制台。
总结:exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
对key进行模式匹配后进行投递的叫做Topic交换机。*(星号)可以代替一个任意标识符 ;#(井号)可以代替零个或多个标识符。
在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。
我们创建3个绑定:Q1绑定键是“.orange.”,Q2绑定键是“..rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,
Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。
还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。
Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
所以,Fanout Exchange 转发消息是最快的。
首部交换机是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。
绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
– exchange持久化,在声明时指定durable => 1
– queue持久化,在声明时指定durable => 1
– 消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
这种模式是最简单的模式,就发送一串字符串,这个字符串为key,接收的时候也完全以这个字符串本来来确定,不需要绑定任何exchange,使用默认的就行。我们以这个模式开始在原来的项目上继续集成。
首先是引入依赖:
1 2 3 4 5 |
<!--rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
appilication.yml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: concurrency: 10 max-concurrency: 10 prefetch: 1 auto-startup: true default-requeue-rejected: true template: retry: enabled: true initial-interval: 1000 max-attempts: 3 max-interval: 10000 multiplier: 1.0 |
rabbitMQ配置类MQConfig:
1 2 3 4 5 6 7 8 9 |
@Configuration public class MQConfig { //MQ name public static final String DIRECT_QUEUE_NAME = "queue"; @Bean public Queue queue(){ return new Queue(QUEUE_NAME,true); } } |
发送者MQSender:
1 2 3 4 5 6 7 8 9 10 11 12 |
@Service @Slf4j public class MQSender { @Autowired private AmqpTemplate amqpTemplate; public void send(Object message){ amqpTemplate.convertAndSend(MQConfig.DIRECT_QUEUE_NAME,message); log.info("send:{}",message); } } |
接收者MQReceiver:
1 2 3 4 5 6 7 8 9 |
@Service @Slf4j public class MQReceiver { @RabbitListener(queues = MQConfig.DIRECT_QUEUE_NAME) public void receive(String message){ log.info("receive:{}",message); } } |
这样,就完成了最简单的一个字符串的发送-接受。可以在controller中随便测试一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Controller @RequestMapping("/test") public class TestController { @Autowired private MQSender mqSender; @RequestMapping("/mq") @ResponseBody public String mq(){ mqSender.send("hello world"); return "success"; } } |
这个模式正如上面所言,是可以匹配通配符的,显然更加灵活,这里用程序测试一下这个模式效果。
MQConfig:
先来几个常量:
1 2 3 4 5 6 7 8 9 10 |
//queue1名字 public static final String TOPIC_QUEUE_NAME1 = "topic.queue1"; //queue2名字 public static final String TOPIC_QUEUE_NAME2 = "topic.queue2"; //交换机名字 public static final String TOPIC_EXCHANGE_NAME = "topicExchange"; //key等于topic.key1的,后面将配置为只被queue1接收 private static final String TOPIC_KEY_ROUTE1 = "topic.key1"; //key匹配topic.#的都被接收进queue2 private static final String TOPIC_KEY_ROUTE2 = "topic.#"; |
下面配置几个bean:
注:带有 @Configuration 的注解类表示这个类可以使用 Spring IoC 容器作为 bean 定义的来源。@Bean 注解告诉 Spring,一个带有 @Bean 的注解方法将返回一个对象,该对象应该被注册为在 Spring 应用程序上下文中的 bean。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
//创建两个QUEUE对象queue1,queue2的bean被spring管理 @Bean public Queue topicQueue1(){ return new Queue(TOPIC_QUEUE_NAME1,true); } @Bean public Queue topicQueue2(){ return new Queue(TOPIC_QUEUE_NAME2,true); } //交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange(TOPIC_EXCHANGE_NAME); } //queue1--交换机--匹配规则1 @Bean public Binding topicBinding1(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_KEY_ROUTE1); } //queue2--交换机--匹配规则2 @Bean public Binding topicBinding2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_KEY_ROUTE2); } |
MQSender:
1 2 3 4 5 6 7 8 |
//消息1与topic.key1和topic.#都匹配; //消息2与topic.key1不匹配,只与topic.#匹配,那么只能被queue2接收 public void sendTopic(Object message){ log.info("send topic msg:{}",message); amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE_NAME,"topic.key1",message+"--1"); amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE_NAME,"topic.key2",message+"--2"); } |
MQReceiver:
1 2 3 4 5 6 7 8 9 |
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1) public void receiveTopic1(String message){ log.info("topic queue1 receive:{}",message); } @RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2) public void receiveTopic2(String message){ log.info("topic queue2 receive:{}",message); } |
最后测试一把:
1 2 3 4 5 6 |
@RequestMapping("/mq/topic") @ResponseBody public String mq_topic(){ mqSender.sendTopic("hello world"); return "success"; } |
运行结果:
1 2 3 4 |
2018-05-26 18:59:40.281 INFO 9920 --- [nio-8080-exec-1] com.swg.miaosha.mq.MQSender : send topic msg:hello world 2018-05-26 18:59:40.303 INFO 9920 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : topic queue2 receive:hello world--1 2018-05-26 18:59:40.303 INFO 9920 --- [TaskExecutor-10] com.swg.miaosha.mq.MQReceiver : topic queue2 receive:hello world--2 2018-05-26 18:59:40.303 INFO 9920 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : topic queue1 receive:hello world--1 |
运行结果与初期的分析结果一致。
这种就是广播模式,即所有的绑定到指定的exchange上的queue都可以接收消息。
MQConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public static final String FANOUT_EXCHANGE_NAME = "fanoutExchage"; public static final String FANOUT_QUEUE_NAME1 = "fanout.queue1"; public static final String FANOUT_QUEUE_NAME2 = "fanout.queue2"; @Bean public Queue fanoutQueue1(){ return new Queue(FANOUT_QUEUE_NAME1,true); } @Bean public Queue fanoutQueue2(){ return new Queue(FANOUT_QUEUE_NAME2,true); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUT_EXCHANGE_NAME); } @Bean public Binding fanoutBinding1(){ return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2(){ return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } |
MQSender:
1 2 3 4 |
public void sendFanout(Object message){ log.info("send fanout msg:{}",message); amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE_NAME,"",message); } |
MQReceiver:
1 2 3 4 5 6 7 8 9 |
@RabbitListener(queues = MQConfig.FANOUT_QUEUE_NAME1) public void receiveFanout1(String message){ log.info("fanout queue1 receive:{}",message); } @RabbitListener(queues = MQConfig.FANOUT_QUEUE_NAME2) public void receiveFanout2(String message){ log.info("fanout queue2 receive:{}",message); } |
运行结果:
1 2 3 |
2018-05-26 20:03:29.592 INFO 16680 --- [nio-8080-exec-1] com.swg.miaosha.mq.MQSender : send fanout msg:hello world 2018-05-26 20:03:29.619 INFO 16680 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : fanout queue1 receive:hello world 2018-05-26 20:03:29.619 INFO 16680 --- [cTaskExecutor-1] com.swg.miaosha.mq.MQReceiver : fanout queue2 receive:hello world |
queue1和queue2都接受到了消息。
MQConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public static final String HEADERS_EXCHANGE_NAME = "headersExchage"; public static final String HEADERS_QUEUE_NAME = "headers.queue"; @Bean public HeadersExchange headersExchange(){ return new HeadersExchange(HEADERS_EXCHANGE_NAME); } @Bean public Queue headersQueue(){ return new Queue(HEADERS_QUEUE_NAME,true); } //就是说要完全匹配这个Map才能进入queue中发送出去 @Bean public Binding headersBinding(){ Map<String,Object> map = new HashMap<>(); map.put("header1","value1"); map.put("header2","value2"); return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match(); } |
MQSender:
1 2 3 4 5 6 7 8 9 10 |
//map要一样 public void sendHeaders(Object message){ String msg = RedisService.beanToString(message); log.info("send fanout msg:{}",message); MessageProperties properties = new MessageProperties(); properties.setHeader("header1","value1"); properties.setHeader("header2","value2"); Message obj = new Message(msg.getBytes(),properties); amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE_NAME,"",obj); } |
MQReceiver:
1 2 3 4 |
@RabbitListener(queues = MQConfig.HEADERS_QUEUE_NAME) public void receiveHeaders(byte[] message){ log.info("fanout queue2 receive:{}",new String(message)); } |
思路:减少数据库访问
对于之前的秒杀接口do_miaosha:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR); //判断库存 GoodsVo goodsVo = goodsService.getGoodsVoByGoodsId(goodsId); if(goodsVo.getStockCount() <= 0){ return Result.error(CodeMsg.MIAO_SHA_OVER); } //判断是否已经秒杀到了 MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return Result.error(CodeMsg.REPEATE_MIAOSHA); } //减库存、下订单、写入秒杀订单,需要在一个事务中执行 OrderInfo orderInfo = miaoshaService.miaosha(user,goodsVo); return Result.success(orderInfo); } |
这里判断库存是直接从数据库查,因为并发量比较大,存在性能问题。后面秒杀到之后,也不是直接减库存, 而是将其放到消息队列中慢慢交给数据库去调整。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR); //1.预减库存进行优化 /*********************************优化1开始*************************************/ long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId); if(stock < 0){ return Result.error(CodeMsg.MIAO_SHA_OVER); } /*********************************优化1结束*************************************/ //2.判断是否已经秒杀到了 MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return Result.error(CodeMsg.REPEATE_MIAOSHA); } /*********************************优化2开始*************************************/ //3.进入消息队列 MiaoshaMessage message = new MiaoshaMessage(); message.setUser(user); message.setGoodsId(goodsId); sender.sendMiaoshaMessage(message); /*********************************优化2结束*************************************/ return Result.success(0);//排队中 } |
在消息队列中对消息进行消化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@RabbitListener(queues = MQConfig.MIAOSHA_QUEUE) public void receive(String message){ log.info("receive message:{}",message); MiaoshaMessage msg = RedisService.stringToBean(message,MiaoshaMessage.class); MiaoshaUser user = msg.getUser(); long goodsId = msg.getGoodsId(); //判断数据库库存是否真的足够 GoodsVo goodsVo = goodsService.getGoodsVoByGoodsId(goodsId); if(goodsVo.getStockCount() <= 0){ return; } //判断是否已经秒杀到了 MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return; } //减库存、下订单、写入秒杀订单,需要在一个事务中执行 OrderInfo orderInfo = miaoshaService.miaosha(user,goodsVo); } |
对于controller中的优化1:redis预减库存。那么需要在系统启动的时候将秒杀商品的库存先添加到redis中:
1 |
public class MiaoshaController implements InitializingBean |
重写afterPropertiesSet()方法:
1 2 3 4 5 6 7 8 9 10 11 |
@Override public void afterPropertiesSet() throws Exception { //将秒杀商品的库存全部先存储到redis中 List<GoodsVo> goodsVoList = goodsService.getGoodsVoList(); if(goodsVoList == null){ return; } for(GoodsVo goods:goodsVoList){ redisService.set(GoodsKey.getMiaoshaGoodsStock,""+goods.getId(),goods.getStockCount()); } } |
对于前端,这时也要进行修改了,因为点击秒杀商品按键后,这里考虑三种情况:排队等待、失败、成功。那么这里规定-1为失败,0为排队,1为秒杀成功已经写入数据库。
原来的detail.htm中秒杀事件函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
function doMiaosha(){ $.ajax({ url:"/miaosha/do_miaosha", type:"POST", data:{ goodsId:$("#goodsId").val(), }, success:function(data){ if(data.code == 0){ window.location.href="/order_detail.htm?orderId="+data.data.id; }else{ layer.msg(data.msg); } }, error:function(){ layer.msg("客户端请求有误"); } }); } |
秒杀到商品就直接返回,现在后端改为消息队列,所以需要增加函数进行判断,必要时需要轮询:
1 2 3 4 5 |
if(data.code == 0){ window.location.href="/order_detail.htm?orderId="+data.data.id; }else{ layer.msg(data.msg); } |
所以将其改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
//其他的部分省略 ... if(data.code == 0){ //window.location.href="/order_detail.htm?orderId="+data.data.id; //秒杀到商品的时候,这个时候不是直接返回成功,后端是进入消息队列,所以前端是轮询结果,显示排队中 getMiaoshaResult($("#goodsId").val()); }else{ layer.msg(data.msg); } ... function getMiaoshaResult(goodsId) { g_showLoading(); $.ajax({ url:"/miaosha/result", type:"GET", data:{ goodsId:$("#goodsId").val(), }, success:function(data){ if(data.code == 0){ var result = data.data; //失败--- -1 if(result <= 0){ layer.msg("对不起,秒杀失败!"); } //排队等待,轮询--- 0 else if(result == 0){//继续轮询 setTimeout(function () { getMiaoshaResult(goodsId); },50); } //成功---- 1 else { layer.msg("恭喜你,秒杀成功,查看订单?",{btn:["确定","取消"]}, function () { window.location.href="/order_detail.htm?orderId="+result; }, function () { layer.closeAll(); } ); } }else{ layer.msg(data.msg); } }, error:function(){ layer.msg("客户端请求有误"); } }); } |
那么相应地,后台也要增加一个方法:result
1 2 3 4 5 6 7 8 9 |
@RequestMapping(value = "/result",method = RequestMethod.GET) @ResponseBody public Result<Long> result(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR); long result = miaoshaService.getMiaoshaResult(user.getId(),goodsId); return Result.success(result); } |
那么如何标记状态呢?这就是getMiaoshaResult方法所做的事情。
对于成功的状态判断,很简单,从数据库查,能查到就说明已经秒杀成功,否则就是两种情况:失败或者正在等待生成订单。
对于这两种状态,我们需要用redis来实现,思路是:在系统初始化的时候,redis中设置秒杀商品是否卖完的状态为false—即未卖完;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public long getMiaoshaResult(Long userId, long goodsId) { MiaoshaOrder orderInfo = orderService.getMiaoshaOrderByUserIdGoodsId(userId,goodsId); if(orderInfo != null){ return orderInfo.getId(); }else{ boolean isOver = getGoodsOver(goodsId); if(isOver){ //库存已经没了 return -1; }else{ //表示还没入库,继续等待结果 return 0; } } } |
在MiaoshaService中的Miaosha方法:数据库减库存失败的话,说明数据库的库存已经小于0了,那么这个时候,立即将redis初始设置的秒杀商品是否卖完的状态为true,表示商品已经全部卖完,返回秒杀失败。否则就是要前端等待等待。
1 2 3 4 5 6 7 8 9 10 11 |
@Transactional public OrderInfo miaosha(MiaoshaUser user, GoodsVo goods) { //减库存、下订单、写入秒杀订单 boolean success =goodsService.reduceStock(goods); if(success){ return orderService.createOrder(user,goods); }else{ setGoodsOver(goods.getId()); return null; } } |
对于两个小方法getGoodsOver和setGoodsOver:
1 2 3 4 5 6 7 |
private void setGoodsOver(long goodId){ redisService.set(MiaoshaKey.isGoodsOver,""+goodId,true); } private boolean getGoodsOver(long goodsId) { return redisService.exists(MiaoshaKey.isGoodsOver,""+goodsId); } |
那么redis预减库存,然后消息队列来进行创建订单就实现了。
当然,对于redis预减库存这一点,还有要优化的地方,就是现在的do_miaosha接口是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR); //1.预减库存进行优化 /*********************************优化1开始*************************************/ long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId); if(stock < 0){ return Result.error(CodeMsg.MIAO_SHA_OVER); } /*********************************优化1结束*************************************/ //2.判断是否已经秒杀到了 MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return Result.error(CodeMsg.REPEATE_MIAOSHA); } /*********************************优化2开始*************************************/ //3.进入消息队列 MiaoshaMessage message = new MiaoshaMessage(); message.setUser(user); message.setGoodsId(goodsId); sender.sendMiaoshaMessage(message); /*********************************优化2结束*************************************/ return Result.success(0);//排队中 } |
但是,当秒杀商品已经没得时候,就没有必要再去redis中进行判断了,毕竟查询redis也是需要网络开销的,解决思路是:在内存中进行判断,如果redisService.decr得到的stock少于零的时候,直接将内存中的一个标志改变一下,那么下次再进入do_miaosha接口,先判断内存这个标记,如果库存已经小于0了,就不再访问redis,而是直接返回秒杀商品已经卖完。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR); /***************************对redis预减库存再优化*************************** //内存标记,减少不必要的redis的访问 boolean over = localOverMap.get(goodsId); if(over){ return Result.error(CodeMsg.MIAO_SHA_OVER); } *******************************************************************/ //预减库存进行优化 long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId); if(stock < 0){ return Result.error(CodeMsg.MIAO_SHA_OVER); } //判断是否已经秒杀到了 MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ localOverMap.put(goodsId,true); return Result.error(CodeMsg.REPEATE_MIAOSHA); } //进入消息队列 MiaoshaMessage message = new MiaoshaMessage(); message.setUser(user); message.setGoodsId(goodsId); sender.sendMiaoshaMessage(message); return Result.success(0);//排队中 } |
声明一个map:
1 |
private Map<Long,Boolean> localOverMap = new HashMap<>(); |
那么在afterPropertiesSet这个系统加载的初始化方法中对这个map进行初始化,goodsId–stock:
1 |
localOverMap.put(goods.getId(),false); |
在原来的redis预减库存初,发现库存小于0 ,就改为true:
1 2 3 4 |
if(stock < 0){ localOverMap.put(goodsId,true); return Result.error(CodeMsg.MIAO_SHA_OVER); } |
最后do_miaosha接口变为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
@RequestMapping(value = "/do_miaosha",method = RequestMethod.POST) @ResponseBody public Result<Integer> do_miaosha(Model model, MiaoshaUser user, @RequestParam("goodsId") long goodsId){ if(user == null) return Result.error(CodeMsg.SESSION_ERROR); //内存标记,减少不必要的redis的访问 boolean over = localOverMap.get(goodsId); if(over){ return Result.error(CodeMsg.MIAO_SHA_OVER); } //预减库存进行优化 long stock = redisService.decr(GoodsKey.getMiaoshaGoodsStock,""+goodsId); if(stock < 0){ localOverMap.put(goodsId,true); return Result.error(CodeMsg.MIAO_SHA_OVER); } //判断是否已经秒杀到了 MiaoshaOrder miaoshaOrder = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(),goodsId); if(miaoshaOrder != null){ return Result.error(CodeMsg.REPEATE_MIAOSHA); } //进入消息队列 MiaoshaMessage message = new MiaoshaMessage(); message.setUser(user); message.setGoodsId(goodsId); sender.sendMiaoshaMessage(message); return Result.success(0);//排队中 } |
ok,整个关于redis预减库存和rabbitMQ创建订单这个优化已经基本完成了。