前言在物联网开发中MQTT 已经成为事实上的通信标准。很多朋友刚开始接触时总觉得 MQTT 很复杂——Broker 是什么QoS 怎么选Spring Boot 怎么接入这篇文章不讲虚的从EMQX 服务端安装到Spring Boot 完整代码实现一步不落带你跑通全流程。一、MQTT 是什么一句话概括MQTT 是一种基于“发布/订阅”模式的轻量级消息传输协议。你可以把它理解成一个“中间人”发布者发送消息到某个主题Topic订阅者订阅感兴趣的主题接收消息Broker代理负责转发消息不像 HTTP 那样“你请求我响应”MQTT 更像是“谁关心这个消息就订阅它”。这种方式让发布者和订阅者完全解耦非常适合物联网设备通信。MQTT 核心概念速览概念说明Topic主题消息的“地址”用/分层如device/123/statusQoS服务质量0最多一次可能丢1至少一次可能重复2只有一次最可靠通配符匹配单层#匹配多层仅用于订阅二、环境准备安装 EMQXWindows2.1 下载访问 EMQX 官网下载页面选择Windows版本的 ZIP 安装包。官网下载地址⚠️注意下载emqx-5.x.x-windows-amd64.zip区分开源版和企业版我们选开源版即可。系统要求Windows 10/11 64位系统至少 2GB 可用内存磁盘空间 ≥ 200MB2.2 解压✅推荐解压到D:\MQTT\emqx或C:\emqx这样的纯英文路径解压后的目录结构textemqx/ ├── bin/ # 核心执行文件 ├── etc/ # 配置文件 ├── data/ # 运行数据 ├── log/ # 日志文件 └── releases/ # 版本信息2.3 启动 EMQX打开管理员权限的 PowerShell 或 CMD进入emqx/bin目录先安装emqx install 后启动emqx console2.4 验证启动启动成功后打开浏览器访问http://localhost:18083进入 EMQX Dashboard 管理控制台。默认登录账号密码用户名admin密码public登录后建议立即修改密码。2.5 端口说明EMQX 默认使用以下端口端口用途1883MQTT TCP 协议端口8883MQTT SSL/TLS 端口8083MQTT WebSocket 端口18083Dashboard 管理控制台三、Spring Boot 集成 MQTT3.1 项目依赖在pom.xml中添加 Spring Integration MQTT 依赖xmldependency groupIdorg.springframework.integration/groupId artifactIdspring-integration-mqtt/artifactId /dependency dependency groupIdorg.eclipse.paho/groupId artifactIdorg.eclipse.paho.client.mqttv3/artifactId version1.2.5/version /dependency3.2 配置文件application.ymlyamlspring: application: name: mqtt-demo mqtt: brokerUrl: tcp://localhost:1883 user: root password: 123456 clientId: bitstorm-server topics: - /x//notice - /x//device persistence: /var/mqtt/persistence completionTimeout: 5000 keepAlive: 60 connectionTimeout: 30 defaultQos: 1 autoReconnect: true cleanSession: true maxInflight: 100003.3 配置类MqttPropertiesjavaimport lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; ConfigurationProperties(prefix mqtt) Data Component public class MqttProperties { private String brokerUrl; private String user; private String password; private String clientId; private String[] topics; private String persistence; private int completionTimeout; private int keepAlive; private int connectionTimeout; private int defaultQos; private boolean autoReconnect; private boolean cleanSession; private int maxInflight; }3.4 核心配置类MqttIntegrationConfig这是整个集成的核心负责创建 MQTT 客户端工厂、消息入站适配器接收消息和出站适配器发送消息。import jakarta.annotation.Resource; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.ExecutorChannel; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.UUID; import java.util.concurrent.*; Configuration IntegrationComponentScan(basePackages com.bitstorm.mqtt) EnableIntegration public class MqttIntegrationConfig { private static final Logger log LoggerFactory.getLogger(MqttIntegrationConfig.class); Resource private MqttProperties mqttProps; /** * MQTT 客户端工厂 */ Bean public DefaultMqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory new DefaultMqttPahoClientFactory(); MqttConnectOptions options getMqttConnectOptions(); factory.setPersistence(new MqttDefaultFilePersistence(mqttProps.getPersistence())); factory.setConnectionOptions(options); return factory; } private MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions options new MqttConnectOptions(); options.setServerURIs(new String[]{mqttProps.getBrokerUrl()}); options.setUserName(mqttProps.getUser()); options.setPassword(mqttProps.getPassword().toCharArray()); options.setKeepAliveInterval(mqttProps.getKeepAlive()); options.setConnectionTimeout(mqttProps.getConnectionTimeout()); options.setAutomaticReconnect(mqttProps.isAutoReconnect()); options.setCleanSession(mqttProps.isCleanSession()); options.setMaxInflight(mqttProps.getMaxInflight()); return options; } /** * 消息输入通道带线程池用于异步处理 */ Bean public MessageChannel mqttInputChannel() { ThreadPoolExecutor executor new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(mqttProps.getMaxInflight()), r - { Thread t new Thread(r, mqtt-inbound- Thread.currentThread().getName()); t.setDaemon(true); return t; }, new ThreadPoolExecutor.CallerRunsPolicy() ); return new ExecutorChannel(executor); } /** * 消息入站适配器订阅消息 */ Bean public MqttPahoMessageDrivenChannelAdapter inboundAdapter( DefaultMqttPahoClientFactory clientFactory, MessageChannel mqttInputChannel, MessageChannel mqttErrorChannel) { String clientId mqttProps.getClientId() -consume- UUID.randomUUID(); MqttPahoMessageDrivenChannelAdapter adapter new MqttPahoMessageDrivenChannelAdapter(clientId, clientFactory, mqttProps.getTopics()); adapter.setCompletionTimeout(mqttProps.getCompletionTimeout()); adapter.setQos(mqttProps.getDefaultQos()); adapter.setOutputChannel(mqttInputChannel); adapter.setErrorChannel(mqttErrorChannel); adapter.setManualAcks(true); // 开启手动确认 return adapter; } /** * 消息输出通道 */ Bean public MessageChannel mqttOutputChannel() { return new DirectChannel(); } /** * 消息出站处理器发布消息 */ Bean ServiceActivator(inputChannel mqttOutputChannel) public MessageHandler mqttOutboundHandler(DefaultMqttPahoClientFactory clientFactory) { String clientId mqttProps.getClientId() -production- UUID.randomUUID(); MqttPahoMessageHandler handler new MqttPahoMessageHandler(clientId, clientFactory); handler.setAsync(true); handler.setDefaultRetained(false); handler.setDefaultQos(mqttProps.getDefaultQos()); return handler; } /** * 全局错误通道 */ Bean public MessageChannel mqttErrorChannel() { return new DirectChannel(); } /** * 错误通道处理器 */ Bean ServiceActivator(inputChannel mqttErrorChannel) public MessageHandler mqttErrorHandler() { return message - { Throwable cause message.getPayload() instanceof Throwable ? (Throwable) message.getPayload() : new MessagingException(Unknown error, (Throwable) message.getPayload()); log.error(MQTT 错误发生消息头{}, message.getHeaders(), cause); }; } }3.5 MQTT 网关接口发送消息javaimport org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; MessagingGateway(defaultRequestChannel mqttOutputChannel) public interface MqttGateway { /** * 发送消息到指定主题使用默认 QoS */ void sendToMqtt(Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送消息到指定主题并指定 QoS */ void sendToMqtt(Header(MqttHeaders.TOPIC) String topic, Header(MqttHeaders.QOS) int qos, String payload); }3.6 消息接收处理器订阅消息javaimport jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; Component Slf4j public class MqttMessageHandler { Resource private MqttGateway mqttGateway; /** * 定时发送测试消息 */ Scheduled(fixedDelay 3000) public void sendTestMessage() { String payload String.format(设备信息: %d, 时间: %s, System.currentTimeMillis(), LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd HH:mm:ss))); mqttGateway.sendToMqtt(/x/identify/device, payload); log.info(测试消息已发送: {}, payload); } /** * 消息处理器 - 手动 ACK */ ServiceActivator(inputChannel mqttInputChannel) public void handleMessage(Message? message) { // 获取 ACK 回调 Object ackObj message.getHeaders().get(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK); AcknowledgmentCallback ack null; if (ackObj instanceof AcknowledgmentCallback) { ack (AcknowledgmentCallback) ackObj; } try { String topic message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC, String.class); String payload message.getPayload().toString(); log.info(收到消息主题{}内容{}, topic, payload); // TODO: 在这里编写你的业务逻辑 if (ack ! null) { ack.acknowledge(); // 手动确认消息 log.debug(消息已确认); } } catch (Exception e) { log.error(处理MQTT消息异常, e); // 根据业务决定是否确认不确认会触发重发 } } }四、测试验证4.1 启动 Spring Boot 应用启动后控制台会输出定时发送的测试消息日志。五、常见问题与踩坑指南5.1 EMQX 启动失败原因端口 1883 被占用解决bashnetstat -ano | findstr :1883找到占用进程并关闭或修改 EMQX 配置文件中的端口。5.2 连接断开MQTT 是长连接网络波动可能导致断开。配置中已开启autoReconnect: true会自动重连。5.3 Topic 设计不合理不要随意命名 Topic建议采用层级结构text✅ device/{deviceId}/status ✅ device/{deviceId}/command ❌ /a/b/c/d/e/f/g5.4 QoS 怎么选QoS适用场景0传感器高频上报的非关键数据丢一条无所谓1控制指令开关灯、设备控制2金融、航空等关键数据大多数场景QoS 1就够用了。5.5 消息重复消费使用 QoS 1 时可能收到重复消息。如果需要幂等处理可以在业务层根据消息 ID 去重。六、总结本文从零开始完成了✅ Windows 上安装 EMQX 并启动✅ Spring Boot 集成 Spring Integration MQTT✅ 消息发布定时发送测试消息✅ 消息订阅接收并手动 ACK✅ 使用 MQTTX 进行端到端测试整套代码可以直接复制到项目中运行。如果遇到问题欢迎在评论区留言交流源码地址https://gitee.com/byte1026/mqtt-case.git文中的所有代码均可直接使用记得根据实际环境修改application.yml中的连接配置。