1 2 3 4 5 6 7 |
<!-- https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/ https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
#rabbitmq rabbitmq: host: 10.0.0.2 port: 5672 username: springboot password: password publisher-confirms: true publisher-returns: true template: mandatory: true #https://github.com/spring-projects/spring-boot/blob/v2.0.5.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java listener: concurrency: 2 #最小消息监听线程数 max-concurrency: 2 #最大消息监听线程数 mybatis: mapper-locations: classpath:mapping/*.xml ··· |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; //错误的 import com.rabbitmq.client.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输, * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息的载体,每个消息都会被投到一个或多个队列。 * * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来. * * Routing Key:路由关键字, * * exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。 * * Producer:消息生产者,就是投递消息的程序. Consumer:消息消费者,就是接受消息的程序. * Channel:消息通道,在客户端的每个连接里,可建立多个channel. * * */ @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String QUEUE_A = "QUEUE_A"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * * 针对消费者配置 * * 1. 设置交换机类型 * * 2. 将队列绑定到交换机 * * FanoutExchange: * 将消息分发到所有的绑定队列,无routingkey的概念 * * HeadersExchange :通过添加属性key-value匹配 * DirectExchange:按照routingkey分发到指定队列 * TopicExchange:多关键字匹配 * * 如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.AbstractExchange接口。 常用交换器类型如下: Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。 Topic(TopicExchange):按规则转发消息(最灵活)。 Headers(HeadersExchange):设置header attribute参数类型的交换机。 Fanout(FanoutExchange):转发消息到所有绑定队列。 * */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } /** * 获取队列A * * @return */ @Bean public Queue queueA() { return new Queue(QUEUE_A, true); // 队列持久 } /** * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。 * * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
package com.redis; import java.util.UUID; import org.springframework.amqp.core.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; @Component public class MsgProducer implements RabbitTemplate.ConfirmCallback , ReturnCallback{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); // 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入 private RabbitTemplate rabbitTemplate; /** * 构造方法注入rabbitTemplate */ @Autowired public MsgProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); // rabbitTemplate如果为单例的话,那回调就是最后设置的内容 /** * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。 * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。 */ rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败"); System.out.println("消息主体 message : "+message); System.out.println("消息主体 message : "+replyCode); System.out.println("描述:"+replyText); System.out.println("消息使用的交换器 exchange : "+exchange); System.out.println("消息使用的路由键 routing : "+routingKey); } /** * rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送 rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。 * @param content */ public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); System.out.println("开始发送消息c : " + content.toLowerCase() + " ,correlationId= " + correlationId); String response = rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId).toString(); System.out.println("结束发送消息c : " + content.toLowerCase()); System.out.println("消费者响应c : " + response + " 消息处理完成"); //logger.info(" 发送消息TO A:" + content); // 把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId); } /** * 回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回调id:" + correlationData); if (ack) { logger.info("消息成功消费"); } else { logger.info("消息消费失败:" + cause); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MsgReceiver { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitHandler public void process(String content) { System.out.println("接收处理队列A当中的消息: " + content); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.redis.MsgProducer; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMqTest { @Autowired private MsgProducer sender; @Test public void sendTest() throws Exception { //while(true){ String msg = new Date().toString(); sender.sendMsg(msg); Thread.sleep(6000); //} } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
package com.test.ratelimit; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.RateLimiter; public class ComplexDemo { private static RateLimiter rateLimiter = RateLimiter.create(10); private static AtomicInteger suc = new AtomicInteger(0), fail = new AtomicInteger(0); public static void main(String[] args) { // TODO Auto-generated method stub List<Runnable> tasks = new ArrayList<Runnable>(); for (int i = 0; i < 100; i++) { tasks.add(new UserRequest(i)); } ExecutorService threadPool = Executors.newCachedThreadPool(); for (Runnable runnable : tasks) { threadPool.execute(runnable); } } private static boolean startGo(int i) { //基于令牌桶算法的限流实现类 /** * 一秒出10个令牌,0.1秒出一个,100个请求进来,假如100个是同时到达, 那么最终只能成交10个,90个都会因为超时而失败。 * */ /** * tryAcquire(long timeout, TimeUnit unit) * 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话, * 或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) */ //判断能否在1秒内得到令牌,如果不能则立即返回false,不会阻塞程序 if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) { System.out.println("暂时无法获取令牌, 排队失败" + i); fail.getAndIncrement(); System.out.println("SUC/FAIL=" + suc.get() + "/" + fail.get()); return false; } if (update() > 0) { System.out.println("成功" + i); suc.getAndIncrement(); System.out.println("FAIL/SUC=" + fail.get() + "/" + suc.get()); return true; } System.out.println("数据不足,失败"); return false; } private static int update() { return 1; } private static class UserRequest implements Runnable { private int id; public UserRequest(int id) { this.id = id; } public void run() { startGo(id) ; } } } 测试结果: ... 成功8 FAIL/SUC=89/10 成功7 FAIL/SUC=89/11 |
总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。
作者:DONG999
链接:https://www.jianshu.com/p/f621b47c80c3
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。