|
|
@ -1,26 +1,32 @@ |
|
|
|
package org.dromara.sample.configuration; |
|
|
|
|
|
|
|
import cn.hutool.core.bean.BeanUtil; |
|
|
|
import cn.hutool.core.util.ObjectUtil; |
|
|
|
import cn.hutool.json.JSONObject; |
|
|
|
import cn.hutool.json.JSONUtil; |
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
|
|
|
import lombok.RequiredArgsConstructor; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.dromara.common.redis.config.RedisConst; |
|
|
|
import org.dromara.common.redis.utils.RedisOpsUtils; |
|
|
|
import org.dromara.common.websocket.dto.BizCodeEnum; |
|
|
|
import org.dromara.sample.manage.domain.ManageDevicePayloadCustom; |
|
|
|
import org.dromara.sample.manage.mapper.IDevicePayloadMapper; |
|
|
|
import org.dromara.sample.manage.model.dto.DevicePayloadDTO; |
|
|
|
import org.dromara.sample.manage.mapper.ManageDevicePayloadCustomMapper; |
|
|
|
import org.dromara.sample.manage.model.dto.TelemetryDTO; |
|
|
|
import org.dromara.sample.manage.model.entity.DevicePayloadEntity; |
|
|
|
import org.dromara.sample.manage.model.enums.UserTypeEnum; |
|
|
|
import org.dromara.sample.manage.service.IDevicePayloadService; |
|
|
|
import org.dromara.sample.wayline.domain.WaylineJobAtmosphere; |
|
|
|
import org.dromara.sample.manage.service.IDeviceRedisService; |
|
|
|
import org.dromara.sample.wayline.domain.bo.WaylineJobAtmosphereBo; |
|
|
|
import org.dromara.sample.wayline.model.dto.WaylineJobDTO; |
|
|
|
import org.dromara.sample.wayline.model.entity.WaylineJobEntity; |
|
|
|
import org.dromara.sample.wayline.service.IWaylineJobAtmosphereService; |
|
|
|
import org.dromara.sample.wayline.service.IWaylineJobService; |
|
|
|
import org.dromara.sample.websocket.config.WebSocketSubscriptionManager; |
|
|
|
import org.dromara.sample.websocket.service.IWebSocketMessageService; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
|
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Optional; |
|
|
|
|
|
|
|
@Slf4j |
|
|
@ -31,49 +37,91 @@ public class LingxiuMqttMessageProcessorImpl implements LingxiuMqttMessageProces |
|
|
|
private final IWaylineJobService waylineJobService; |
|
|
|
private final IDevicePayloadService devicePayloadService; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private IDeviceRedisService deviceRedisService; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private final IDevicePayloadMapper devicePayloadMapper; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private final ManageDevicePayloadCustomMapper manageDevicePayloadCustomMapper; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private IWebSocketMessageService webSocketMessageService; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private WebSocketSubscriptionManager webSocketSubscriptionManager; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
public void process(String topic, String payload) { |
|
|
|
log.info("处理灵嗅MQTT数据 topic={}, payload={}", topic, payload); |
|
|
|
// {"serial":"37c12953","sequence":12,"sateNum":0,"hdop":0,"utcTime":"2025-06-17-01-37-22","latitude":34.270486,"longitude":117.149302,"altitude":0.03,"temperature":29.61,"humidity":21.37,"pressure":100071.42,"airData":{"SO2(ppm)":0,"NO2(ppm)":0,"Ox(ppm)":0,"PM1.0(ug/m3)":14,"PM2.5(ug/m3)":35,"PM10(ug/m3)":46}}
|
|
|
|
|
|
|
|
// if(1==1){ //关闭数据
|
|
|
|
// return;
|
|
|
|
// }
|
|
|
|
// System.out.println("灵嗅"+topic);
|
|
|
|
log.info("处理灵嗅MQTT数据 topic={}, payload={}", topic, payload); |
|
|
|
// {"serial":"37c12953","sequence":12,"sateNum":0,"hdop":0,"utcTime":"2025-06-17-01-37-22","latitude":34.270486,"longitude":117.149302,"altitude":0.03,"temperature":29.61,"humidity":21.37,"pressure":100071.42,"airData":{"SO2(ppm)":0,"NO2(ppm)":0,"Ox(ppm)":0,"PM1.0(ug/m3)":14,"PM2.5(ug/m3)":35,"PM10(ug/m3)":46}}
|
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
JSONObject json = JSONUtil.parseObj(payload); |
|
|
|
System.out.println("json打印"+json); |
|
|
|
JSONObject airData = json.getJSONObject("airData"); |
|
|
|
JSONObject airData = json.getJSONObject("airData"); //"airData":{"SO2(ppm)":0,"NO2(ppm)":0,"Ox(ppm)":0,"PM1.0(ug/m3)":16,"PM2.5(ug/m3)":39,"PM10(ug/m3)":51}
|
|
|
|
|
|
|
|
/** 数据处理-简化对象内容 {"SO2":0,"NO2":0,"Ox":0.082399,"PM1_0":20,"PM2_5":39,"PM10":48} */ |
|
|
|
JSONObject simplified = new JSONObject(); |
|
|
|
for (String key : airData.keySet()) { |
|
|
|
Object value = airData.get(key); |
|
|
|
String newKey = key.replaceAll("\\(.*?\\)", ""); // 去掉括号及其内容:"SO2(ppm)" -> "SO2"
|
|
|
|
newKey = newKey.replace(".", "_"); // 替换 . 为 _,例如 "PM2.5" -> "PM2_5"
|
|
|
|
newKey = newKey.replaceAll("(?i)point", "_"); //point去除:PM1point0 or PM2POINT5 → PM1_0, PM2_5
|
|
|
|
newKey = newKey.trim(); // 去除空格
|
|
|
|
simplified.put(newKey, value); |
|
|
|
} |
|
|
|
|
|
|
|
String utcTimeStr = json.getStr("utcTime"); |
|
|
|
Boolean missionOpenStatus = json.getBool("missionOpenStatus"); |
|
|
|
Float relativeAltitude = json.getFloat("relativeAltitude"); |
|
|
|
Boolean dataRecoveryFlag = json.getBool("dataRecoveryFlag"); |
|
|
|
|
|
|
|
|
|
|
|
//"airData":{"SO2(ppm)":0,"NO2(ppm)":0,"Ox(ppm)":0,"PM1.0(ug/m3)":16,"PM2.5(ug/m3)":39,"PM10(ug/m3)":51}
|
|
|
|
//找默认数据
|
|
|
|
WaylineJobAtmosphereBo jobAtmosphere = JSONUtil.toBean(json, WaylineJobAtmosphereBo.class); |
|
|
|
jobAtmosphere.setAirData(JSONUtil.toJsonStr(airData)); |
|
|
|
jobAtmosphere.setAirData(JSONUtil.toJsonStr(simplified)); |
|
|
|
jobAtmosphere.setUtcTimeStr(utcTimeStr); |
|
|
|
jobAtmosphere.setMissionOpenStatus(missionOpenStatus); |
|
|
|
jobAtmosphere.setRelativeAltitude(relativeAltitude); |
|
|
|
jobAtmosphere.setDataRecoveryFlag(dataRecoveryFlag); |
|
|
|
|
|
|
|
LambdaQueryWrapper<DevicePayloadEntity> queryWrapper = new LambdaQueryWrapper<>(); |
|
|
|
queryWrapper.eq(DevicePayloadEntity::getPayloadSn,jobAtmosphere.getSerial()); |
|
|
|
DevicePayloadEntity devicePayloadEntity = devicePayloadMapper.selectOne(queryWrapper); |
|
|
|
String deviceSn = devicePayloadEntity.getDeviceSn(); |
|
|
|
System.out.println(deviceSn); |
|
|
|
// List<DevicePayloadDTO> devicePayloadEntitiesByDeviceSn = devicePayloadService.getDevicePayloadEntitiesByDeviceSn("");
|
|
|
|
|
|
|
|
//找第三方挂载
|
|
|
|
LambdaQueryWrapper<ManageDevicePayloadCustom> queryWrapper = new LambdaQueryWrapper<>(); |
|
|
|
queryWrapper.eq(ManageDevicePayloadCustom::getPayloadSn, jobAtmosphere.getSerial()); |
|
|
|
ManageDevicePayloadCustom paloadCustom = manageDevicePayloadCustomMapper.selectOne(queryWrapper); |
|
|
|
if (paloadCustom == null) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
String deviceSn = paloadCustom.getDockSn();//8UUXN2B00A00SK
|
|
|
|
String payloadType = paloadCustom.getPayloadType(); |
|
|
|
jobAtmosphere.setPayloadType(payloadType); |
|
|
|
// List<DevicePayloadDTO> devicePayloadEntitiesByDeviceSn = devicePayloadService.getDevicePayloadEntitiesByDeviceSn("");
|
|
|
|
|
|
|
|
// String sn = json.getStr("8UUXN3100A01WS");//8UUXN3100A01WS 8UUXN2T00A01R8
|
|
|
|
/** 更新设备灵嗅状态上线 **/ |
|
|
|
// 查找无人机/wayline信息
|
|
|
|
WaylineJobDTO waylineJobDTO = waylineJobService.getJobByDockSn(deviceSn); |
|
|
|
// waylineJobService.getid
|
|
|
|
// Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobByJobInternalId(sn);
|
|
|
|
// if (jobOpt.isPresent()) {
|
|
|
|
// waylineJobAtmosphereService.handleAdd(jobOpt.get(), json);
|
|
|
|
// } else {
|
|
|
|
// log.warn("找不到对应无人机任务:sn={}", sn);
|
|
|
|
WaylineJobDTO waylineJobDTO = waylineJobService.getJobByDockSn(deviceSn); //飞行中+倒序》找第一个
|
|
|
|
|
|
|
|
// if(ObjectUtil.isEmpty(waylineJobDTO.getId())){
|
|
|
|
// //测试-找一个临时备用的
|
|
|
|
// Optional<WaylineJobDTO> jobByJobId = waylineJobService.getJobByJobId("e3dea0f5-37f2-4d79-ae58-490af3228070","8fc5e69e-2f8d-4a8f-9bdd-0c59eab39d74");
|
|
|
|
// if (jobByJobId.isPresent()) {
|
|
|
|
// waylineJobDTO = jobByJobId.get();
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// "{\"SO2(ppm)\":0,\"NO2(ppm)\":0,\"Ox(ppm)\":0,\"PM1.0(ug/m3)\":16,\"PM2.5(ug/m3)\":38,\"PM10(ug/m3)\":48}"
|
|
|
|
|
|
|
|
if(ObjectUtil.isEmpty(waylineJobDTO.getId())){ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
if(ObjectUtil.isNotEmpty(waylineJobDTO)){ |
|
|
|
jobAtmosphere.setWaylineJobId((long)waylineJobDTO.getId()); |
|
|
@ -83,8 +131,42 @@ public class LingxiuMqttMessageProcessorImpl implements LingxiuMqttMessageProces |
|
|
|
jobAtmosphere.setFileId(waylineJobDTO.getFileId()); |
|
|
|
jobAtmosphere.setWorkspaceId(waylineJobDTO.getWorkspaceId()); |
|
|
|
jobAtmosphere.setDockSn(waylineJobDTO.getDockSn()); |
|
|
|
// BeanUtil.copyProperties();
|
|
|
|
waylineJobAtmosphereService.insertByBo(jobAtmosphere); |
|
|
|
waylineJobAtmosphereService.insertByBo(jobAtmosphere);//存到库里
|
|
|
|
|
|
|
|
//拼接数据到redis ,负责push 对象到灵嗅记录列表-正常
|
|
|
|
// deviceRedisService.pushDeviceSpiritSmellList(deviceSn, jobAtmosphere);
|
|
|
|
// RedisOpsUtils.setWithExpire(RedisConst.DEVICE_SPIRITSMELL_PREFIX + deviceSn, data, 30);
|
|
|
|
|
|
|
|
//动态赋值给dockOsd的redis缓存。
|
|
|
|
jobAtmosphere.setAirDataJson(JSONUtil.parseObj(jobAtmosphere.getAirData())); |
|
|
|
|
|
|
|
// 存到缓存-单独缓存
|
|
|
|
deviceRedisService.setDeviceSpiritSmellOnline(deviceSn,jobAtmosphere); |
|
|
|
// 存到缓存-扩展OSD数据
|
|
|
|
// Object osdRedis = RedisOpsUtils.get(RedisConst.OSD_PREFIX + deviceSn);
|
|
|
|
// if(Optional.ofNullable(osdRedis).isPresent()){
|
|
|
|
//// OsdDockDrone
|
|
|
|
// JSONObject entries = JSONUtil.parseObj(osdRedis);
|
|
|
|
// entries.set("spiritsmell", jobAtmosphere);
|
|
|
|
// RedisOpsUtils.setWithExpire(RedisConst.OSD_PREFIX + deviceSn, entries, RedisConst.ONLINE_REDIS_USER_ALIVE_SECOND);
|
|
|
|
//
|
|
|
|
// }
|
|
|
|
// deviceRedisService.getDeviceSpiritSmellOnline(deviceSn);
|
|
|
|
|
|
|
|
//socket发送-正常
|
|
|
|
Optional<WaylineJobAtmosphereBo> deviceOpt = deviceRedisService.getDeviceSpiritSmellOnline(deviceSn); |
|
|
|
if (deviceOpt.isEmpty()) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
String bizCode = BizCodeEnum.SPIRIT_SMELL.getCode(); |
|
|
|
String dockSn = deviceSn; // 设备码
|
|
|
|
String workspaceId = deviceOpt.get().getWorkspaceId(); |
|
|
|
|
|
|
|
// Collection<MyConcurrentWebSocketSession> subs = webSocketSubscriptionManager.getSessionsForDock(dockSn);
|
|
|
|
webSocketMessageService.sendBatch(workspaceId, UserTypeEnum.WEB.getVal(),bizCode, TelemetryDTO.builder().sn(deviceSn).host(jobAtmosphere).build()); |
|
|
|
// Collection<MyConcurrentWebSocketSession> subs = webSocketSubscriptionManager.getSessionsForDock(dockSn);
|
|
|
|
// webSocketMessageService.sendBatchBySessions(subs, bizCode, TelemetryDTO.builder().sn(deviceSn).host(jobAtmosphere).build());
|
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("处理灵嗅MQTT消息异常", e); |
|
|
|