From f78ba0fe94701370fee11877602f56227dad1944 Mon Sep 17 00:00:00 2001 From: shizisheng Date: Tue, 17 Jun 2025 15:13:22 +0800 Subject: [PATCH] =?UTF-8?q?=E7=81=B5=E5=97=85mqtt=E5=8D=95=E4=B8=80?= =?UTF-8?q?=E9=9B=86=E6=88=90-Lingxiu=20waylinejobDTO=E6=89=A9=E5=B1=95?= =?UTF-8?q?=E6=8E=A5=E6=94=B6id?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../LingxiuMqttMessageProcessor.java | 5 + .../LingxiuMqttMessageProcessorImpl.java | 93 +++++++++++++++++++ .../LingxiuMqttStartupRunner.java | 22 +++++ .../configuration/MqttLingxiuConfig.java | 45 +++++++++ .../configuration/MqttLingxiuSubscriber.java | 45 +++++++++ .../wayline/model/dto/WaylineJobDTO.java | 2 + .../service/impl/WaylineJobServiceImpl.java | 1 + 7 files changed, 213 insertions(+) create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttMessageProcessor.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttMessageProcessorImpl.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttStartupRunner.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/configuration/MqttLingxiuConfig.java create mode 100644 dk-modules/sample/src/main/java/org/dromara/sample/configuration/MqttLingxiuSubscriber.java diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttMessageProcessor.java b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttMessageProcessor.java new file mode 100644 index 0000000..b993f16 --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttMessageProcessor.java @@ -0,0 +1,5 @@ +package org.dromara.sample.configuration; + +public interface LingxiuMqttMessageProcessor { + void process(String topic, String payload); +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttMessageProcessorImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttMessageProcessorImpl.java new file mode 100644 index 0000000..edbf028 --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttMessageProcessorImpl.java @@ -0,0 +1,93 @@ +package org.dromara.sample.configuration; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.sample.manage.mapper.IDevicePayloadMapper; +import org.dromara.sample.manage.model.dto.DevicePayloadDTO; +import org.dromara.sample.manage.model.entity.DevicePayloadEntity; +import org.dromara.sample.manage.service.IDevicePayloadService; +import org.dromara.sample.wayline.domain.WaylineJobAtmosphere; +import org.dromara.sample.wayline.domain.bo.WaylineJobAtmosphereBo; +import org.dromara.sample.wayline.model.dto.WaylineJobDTO; +import org.dromara.sample.wayline.model.entity.WaylineJobEntity; +import org.dromara.sample.wayline.service.IWaylineJobAtmosphereService; +import org.dromara.sample.wayline.service.IWaylineJobService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Optional; + +@Slf4j +@Component +@RequiredArgsConstructor +public class LingxiuMqttMessageProcessorImpl implements LingxiuMqttMessageProcessor{ + private final IWaylineJobAtmosphereService waylineJobAtmosphereService; + private final IWaylineJobService waylineJobService; + private final IDevicePayloadService devicePayloadService; + + @Autowired + private final IDevicePayloadMapper devicePayloadMapper; + + @Override + public void process(String topic, String payload) { + log.info("处理灵嗅MQTT数据 topic={}, payload={}", topic, payload); + // {"serial":"37c12953","sequence":12,"sateNum":0,"hdop":0,"utcTime":"2025-06-17-01-37-22","latitude":34.270486,"longitude":117.149302,"altitude":0.03,"temperature":29.61,"humidity":21.37,"pressure":100071.42,"airData":{"SO2(ppm)":0,"NO2(ppm)":0,"Ox(ppm)":0,"PM1.0(ug/m3)":14,"PM2.5(ug/m3)":35,"PM10(ug/m3)":46}} + +// if(1==1){ //关闭数据 +// return; +// } + try { + JSONObject json = JSONUtil.parseObj(payload); + System.out.println("json打印"+json); + JSONObject airData = json.getJSONObject("airData"); + String utcTimeStr = json.getStr("utcTime"); + + + //"airData":{"SO2(ppm)":0,"NO2(ppm)":0,"Ox(ppm)":0,"PM1.0(ug/m3)":16,"PM2.5(ug/m3)":39,"PM10(ug/m3)":51} + //找默认数据 + WaylineJobAtmosphereBo jobAtmosphere = JSONUtil.toBean(json, WaylineJobAtmosphereBo.class); + jobAtmosphere.setAirData(JSONUtil.toJsonStr(airData)); + jobAtmosphere.setUtcTimeStr(utcTimeStr); + + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(DevicePayloadEntity::getPayloadSn,jobAtmosphere.getSerial()); + DevicePayloadEntity devicePayloadEntity = devicePayloadMapper.selectOne(queryWrapper); + String deviceSn = devicePayloadEntity.getDeviceSn(); + System.out.println(deviceSn); +// List devicePayloadEntitiesByDeviceSn = devicePayloadService.getDevicePayloadEntitiesByDeviceSn(""); + + +// String sn = json.getStr("8UUXN3100A01WS");//8UUXN3100A01WS 8UUXN2T00A01R8 + // 查找无人机/wayline信息 + WaylineJobDTO waylineJobDTO = waylineJobService.getJobByDockSn(deviceSn); +// waylineJobService.getid +// Optional jobOpt = waylineJobService.getJobByJobInternalId(sn); +// if (jobOpt.isPresent()) { +// waylineJobAtmosphereService.handleAdd(jobOpt.get(), json); +// } else { +// log.warn("找不到对应无人机任务:sn={}", sn); +// } +// "{\"SO2(ppm)\":0,\"NO2(ppm)\":0,\"Ox(ppm)\":0,\"PM1.0(ug/m3)\":16,\"PM2.5(ug/m3)\":38,\"PM10(ug/m3)\":48}" + + if(ObjectUtil.isNotEmpty(waylineJobDTO)){ + jobAtmosphere.setWaylineJobId((long)waylineJobDTO.getId()); + jobAtmosphere.setWaylineName(waylineJobDTO.getWaylineName()); + jobAtmosphere.setJobId(waylineJobDTO.getJobId()); + jobAtmosphere.setName(waylineJobDTO.getJobName()); + jobAtmosphere.setFileId(waylineJobDTO.getFileId()); + jobAtmosphere.setWorkspaceId(waylineJobDTO.getWorkspaceId()); + jobAtmosphere.setDockSn(waylineJobDTO.getDockSn()); +// BeanUtil.copyProperties(); + waylineJobAtmosphereService.insertByBo(jobAtmosphere); + } + } catch (Exception e) { + log.error("处理灵嗅MQTT消息异常", e); + } + } +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttStartupRunner.java b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttStartupRunner.java new file mode 100644 index 0000000..68dd5da --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/LingxiuMqttStartupRunner.java @@ -0,0 +1,22 @@ +package org.dromara.sample.configuration; + +import lombok.RequiredArgsConstructor; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class LingxiuMqttStartupRunner implements ApplicationRunner { + private final MqttLingxiuSubscriber subscriber; + + @Override + public void run(ApplicationArguments args) { + try { + subscriber.subscribe(); + } catch (Exception e) { + System.err.println("启动灵嗅 MQTT 订阅失败"+ e); + } + } +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/configuration/MqttLingxiuConfig.java b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/MqttLingxiuConfig.java new file mode 100644 index 0000000..72dbcf3 --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/MqttLingxiuConfig.java @@ -0,0 +1,45 @@ +package org.dromara.sample.configuration; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MqttLingxiuConfig { + + @Value("${lingxiu.mqtt.broker}") + private String brokerUrl; + + @Value("${lingxiu.mqtt.clientId}") + private String clientId; + + @Value("${lingxiu.mqtt.username}") + private String username; + + @Value("${lingxiu.mqtt.password}") + private String password; + + @Bean + public MqttClient lingxiuMqttClient() throws MqttException { + MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setAutomaticReconnect(true); + options.setCleanSession(true); + + client.connect(options); + return client; + } + + @Bean + public MqttLingxiuSubscriber mqttLingxiuSubscriber(MqttClient client, + LingxiuMqttMessageProcessor processor) { + return new MqttLingxiuSubscriber(client, processor); + } +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/configuration/MqttLingxiuSubscriber.java b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/MqttLingxiuSubscriber.java new file mode 100644 index 0000000..9ee2d30 --- /dev/null +++ b/dk-modules/sample/src/main/java/org/dromara/sample/configuration/MqttLingxiuSubscriber.java @@ -0,0 +1,45 @@ +package org.dromara.sample.configuration; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +@Slf4j +public class MqttLingxiuSubscriber { + + private final MqttClient client; + private final LingxiuMqttMessageProcessor processor; + + public MqttLingxiuSubscriber(MqttClient client, LingxiuMqttMessageProcessor processor) { + this.client = client; + this.processor = processor; + } + + public void subscribe() throws MqttException { + String[] topics = new String[]{"/topic/#"}; + + client.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + log.warn("灵嗅MQTT连接断开", cause); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + log.info("MQTT 收到消息:{} -> {}", topic, payload); + + processor.process(topic, payload); // 业务分发 + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) {} + }); + + for (String topic : topics) { + client.subscribe(topic, 1); + } + } +} diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/model/dto/WaylineJobDTO.java b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/model/dto/WaylineJobDTO.java index a702779..7d97e73 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/model/dto/WaylineJobDTO.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/model/dto/WaylineJobDTO.java @@ -23,6 +23,8 @@ import java.util.List; @Builder public class WaylineJobDTO { + private Integer id; + private String jobId; private String jobName; diff --git a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineJobServiceImpl.java b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineJobServiceImpl.java index 82790e3..f3786e7 100644 --- a/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineJobServiceImpl.java +++ b/dk-modules/sample/src/main/java/org/dromara/sample/wayline/service/impl/WaylineJobServiceImpl.java @@ -293,6 +293,7 @@ public class WaylineJobServiceImpl implements IWaylineJobService { } WaylineJobDTO.WaylineJobDTOBuilder builder = WaylineJobDTO.builder() + .id(entity.getId()) .jobId(entity.getJobId()) .jobName(entity.getName()) .fileId(entity.getFileId())