spring 官方说明
https://docs.spring.io/spring-integration/reference/html/mqtt.html#mqtt
@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "lhw_test");adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message> message) throws MessagingException {System.out.println(message.getPayload());log.info("--------------------------------------------");log.info("handleMessage:收到的消息为:{}", message);log.info("--------------------------------------------");}};}
@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] { "tcp://localhost:1883" });options.setUserName("admin");options.setPassword("123456".toCharArray());factory.setConnectionOptions(options);return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("testClient002", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic("lhw_test");return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}
package com.it.test_mqtt.component;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;@Slf4j
@Configuration
public class MqttConfig {
// ---------------------------mqtt 接受服务端 消息,相关配置 start-------------------------------@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", "lhw_test");adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message> message) throws MessagingException {System.out.println(message.getPayload());log.info("--------------------------------------------");log.info("handleMessage:收到的消息为:{}", message);log.info("--------------------------------------------");}};}// ---------------------------mqtt 接受服务端 消息,相关配置 end-------------------------------// ---------------------------mqtt 向 服务端 发送 消息,相关配置 start-------------------------------@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] { "tcp://localhost:1883" });options.setUserName("admin");options.setPassword("123456".toCharArray());factory.setConnectionOptions(options);return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("testClient002", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic("lhw_test");return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}// ---------------------------mqtt 向 服务端 发送 消息,相关配置 end-------------------------------}
package com.it.test_mqtt.component;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttOutputApi {// 定义重载方法,用于消息发送void sendToMqtt(String payload);// 指定topic进行消息发送void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
package com.it.test_mqtt.controller;import com.it.test_mqtt.component.MqttOutputApi;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/msg")
public class SendMessageController {@Autowiredprivate MqttOutputApi mqttOutputApi;@GetMapping("/send")public void sendMsg(String text){mqttOutputApi.sendToMqtt(text);}
}
package com.it.test_mqtt.component;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.*;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MqttEventListener {/*** 连接失败的事件通知** @param mqttConnectionFailedEvent*/@EventListener(classes = MqttConnectionFailedEvent.class)public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {log.info("连接失败的事件通知");}/*** 已发送的事件通知** @param mqttMessageSentEvent*/@EventListener(classes = MqttMessageSentEvent.class)public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {log.info("已发送的事件通知");}/*** 已传输完成的事件通知* 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执* 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知* 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知** @param mqttMessageDeliveredEvent*/@EventListener(classes = MqttMessageDeliveredEvent.class)public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {log.info("已传输完成的事件通知");}/*** 消息订阅的事件通知** @param mqttSubscribedEvent*/@EventListener(classes = MqttSubscribedEvent.class)public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {log.info("消息订阅的事件通知");}}
注意: 要使用 mqtt event,需要在 输出管道中修改相应的配置,将messageHandler.setAsyncEvents(true);置为true
@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("testClient002", mqttClientFactory());//当 时true,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认值为false(在确认交付之前发送阻止)。messageHandler.setAsync(true);//当 async 和 async-events 都为 true 时,// 会发出 MqttMessageSentEvent(请参阅事件)。// 它包含消息、主题、客户端库生成的messageId、clientId和clientInstance(每次连接客户端时递增)。// 当客户端库确认交付时,会发出 MqttMessageDeliveredEvent。// 它包含 messageId、clientId 和 clientInstance,使传递与发送相关联。// 任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。// 请注意,有可能在 MqttMessageSentEvent 之前接收到 MqttMessageDeliveredEvent。默认值为false。messageHandler.setAsyncEvents(true);messageHandler.setDefaultTopic("lhw_test");return messageHandler;}
上一篇:多线程打印
下一篇:商业网络中数字孪生的未开发潜力