MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。
发布订阅
🟣 MQTT 协议
服务端使用 mosquitto(版本2.0.14)
下载页面:https://mosquitto.org/download/
下载页面:https://mqttx.app/zh#download
下载链接:http://www.jensd.de/apps/mqttfx/1.7.1/mqttfx-1.7.1-windows-x64.exe
MQTTfx官网:http://mqttfx.org
太极创客下载地址:http://www.taichi-maker.com/homepage/download/#mqtt
备份的蓝奏云下载链接:https://ioufev.lanzout.com/irlNC064nc4f
https://github.com/eclipse/paho.mqtt.java
paho是eclipse提供MQTT客户端开源库,Java代码集成这个客户端用来收发消息。
代码:https://gitee.com/ioufev/mqtt-springboot-demo
pom.xml
1 2 3 4 5 |
<!-- MQTT --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> |
spring中集成框架,有消息入站通道(用来接收消息)和出站通道(用来发送消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration public class MqttConfig { // 消费消息 /** * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。 * @return factory */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); // 设置代理端的URL地址,可以是多个 options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"}); factory.setConnectionOptions(options); return factory; } /** * 入站通道 */ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 入站 */ @Bean public MessageProducer inbound() { // Paho客户端消息驱动通道适配器,主要用来订阅主题 MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho", mqttClientFactory(), "boat", "collector", "battery", "+/sensor"); adapter.setCompletionTimeout(5000); // Paho消息转换器 DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); // 按字节接收消息 // defaultPahoMessageConverter.setPayloadAsBytes(true); adapter.setConverter(defaultPahoMessageConverter); adapter.setQos(1); // 设置QoS adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean // ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。 @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> { String payload = message.getPayload().toString(); // byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式 String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 根据主题分别进行消息处理。 if (topic.matches(".+/sensor")) { // 匹配:1/sensor String sensorSn = topic.split("/")[0]; System.out.println("传感器" + sensorSn + ": 的消息: " + payload); } else if (topic.equals("collector")) { System.out.println("采集器的消息:" + payload); } else { System.out.println("丢弃消息:主题[" + topic + "],负载:" + payload); } }; } // 发送消息 /** * 出站通道 */ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * 出站 */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler outbound() { // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory()); messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。 messageHandler.setDefaultTopic("command"); messageHandler.setDefaultQos(1); // 设置默认QoS // Paho消息转换器 DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); // defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息 messageHandler.setConverter(defaultPahoMessageConverter); return messageHandler; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { // 定义重载方法,用于消息发送 void sendToMqtt(String payload); // 指定topic进行消息发送 void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); } |
测试方式:使用接口工具,给接口发送消息,从而调用MQTT客户端发布消息
类MqttController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import com.ioufev.mqtt.domain.MyMessage; import com.ioufev.mqtt.mqtt.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class MqttController { @Resource private MqttGateway mqttGateway; @PostMapping("/send") public String send(@RequestBody MyMessage myMessage) { // 发送消息到指定主题 mqttGateway.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent()); return "send topic: " + myMessage.getTopic()+ ", message : " + myMessage.getContent(); } } |
类MyMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public class MyMessage { private String topic; private String content; public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } } |
作者:在山的那边是海
链接:https://www.jianshu.com/p/16c752812d48
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。