7 changed files with 213 additions and 0 deletions
@ -0,0 +1,5 @@ |
|||||
|
package org.dromara.sample.configuration; |
||||
|
|
||||
|
public interface LingxiuMqttMessageProcessor { |
||||
|
void process(String topic, String payload); |
||||
|
} |
@ -0,0 +1,93 @@ |
|||||
|
package org.dromara.sample.configuration; |
||||
|
|
||||
|
import cn.hutool.core.bean.BeanUtil; |
||||
|
import cn.hutool.core.util.ObjectUtil; |
||||
|
import cn.hutool.json.JSONObject; |
||||
|
import cn.hutool.json.JSONUtil; |
||||
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
||||
|
import lombok.RequiredArgsConstructor; |
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.dromara.sample.manage.mapper.IDevicePayloadMapper; |
||||
|
import org.dromara.sample.manage.model.dto.DevicePayloadDTO; |
||||
|
import org.dromara.sample.manage.model.entity.DevicePayloadEntity; |
||||
|
import org.dromara.sample.manage.service.IDevicePayloadService; |
||||
|
import org.dromara.sample.wayline.domain.WaylineJobAtmosphere; |
||||
|
import org.dromara.sample.wayline.domain.bo.WaylineJobAtmosphereBo; |
||||
|
import org.dromara.sample.wayline.model.dto.WaylineJobDTO; |
||||
|
import org.dromara.sample.wayline.model.entity.WaylineJobEntity; |
||||
|
import org.dromara.sample.wayline.service.IWaylineJobAtmosphereService; |
||||
|
import org.dromara.sample.wayline.service.IWaylineJobService; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.util.List; |
||||
|
import java.util.Optional; |
||||
|
|
||||
|
@Slf4j |
||||
|
@Component |
||||
|
@RequiredArgsConstructor |
||||
|
public class LingxiuMqttMessageProcessorImpl implements LingxiuMqttMessageProcessor{ |
||||
|
private final IWaylineJobAtmosphereService waylineJobAtmosphereService; |
||||
|
private final IWaylineJobService waylineJobService; |
||||
|
private final IDevicePayloadService devicePayloadService; |
||||
|
|
||||
|
@Autowired |
||||
|
private final IDevicePayloadMapper devicePayloadMapper; |
||||
|
|
||||
|
@Override |
||||
|
public void process(String topic, String payload) { |
||||
|
log.info("处理灵嗅MQTT数据 topic={}, payload={}", topic, payload); |
||||
|
// {"serial":"37c12953","sequence":12,"sateNum":0,"hdop":0,"utcTime":"2025-06-17-01-37-22","latitude":34.270486,"longitude":117.149302,"altitude":0.03,"temperature":29.61,"humidity":21.37,"pressure":100071.42,"airData":{"SO2(ppm)":0,"NO2(ppm)":0,"Ox(ppm)":0,"PM1.0(ug/m3)":14,"PM2.5(ug/m3)":35,"PM10(ug/m3)":46}}
|
||||
|
|
||||
|
// if(1==1){ //关闭数据
|
||||
|
// return;
|
||||
|
// }
|
||||
|
try { |
||||
|
JSONObject json = JSONUtil.parseObj(payload); |
||||
|
System.out.println("json打印"+json); |
||||
|
JSONObject airData = json.getJSONObject("airData"); |
||||
|
String utcTimeStr = json.getStr("utcTime"); |
||||
|
|
||||
|
|
||||
|
//"airData":{"SO2(ppm)":0,"NO2(ppm)":0,"Ox(ppm)":0,"PM1.0(ug/m3)":16,"PM2.5(ug/m3)":39,"PM10(ug/m3)":51}
|
||||
|
//找默认数据
|
||||
|
WaylineJobAtmosphereBo jobAtmosphere = JSONUtil.toBean(json, WaylineJobAtmosphereBo.class); |
||||
|
jobAtmosphere.setAirData(JSONUtil.toJsonStr(airData)); |
||||
|
jobAtmosphere.setUtcTimeStr(utcTimeStr); |
||||
|
|
||||
|
LambdaQueryWrapper<DevicePayloadEntity> queryWrapper = new LambdaQueryWrapper<>(); |
||||
|
queryWrapper.eq(DevicePayloadEntity::getPayloadSn,jobAtmosphere.getSerial()); |
||||
|
DevicePayloadEntity devicePayloadEntity = devicePayloadMapper.selectOne(queryWrapper); |
||||
|
String deviceSn = devicePayloadEntity.getDeviceSn(); |
||||
|
System.out.println(deviceSn); |
||||
|
// List<DevicePayloadDTO> devicePayloadEntitiesByDeviceSn = devicePayloadService.getDevicePayloadEntitiesByDeviceSn("");
|
||||
|
|
||||
|
|
||||
|
// String sn = json.getStr("8UUXN3100A01WS");//8UUXN3100A01WS 8UUXN2T00A01R8
|
||||
|
// 查找无人机/wayline信息
|
||||
|
WaylineJobDTO waylineJobDTO = waylineJobService.getJobByDockSn(deviceSn); |
||||
|
// waylineJobService.getid
|
||||
|
// Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobByJobInternalId(sn);
|
||||
|
// if (jobOpt.isPresent()) {
|
||||
|
// waylineJobAtmosphereService.handleAdd(jobOpt.get(), json);
|
||||
|
// } else {
|
||||
|
// log.warn("找不到对应无人机任务:sn={}", sn);
|
||||
|
// }
|
||||
|
// "{\"SO2(ppm)\":0,\"NO2(ppm)\":0,\"Ox(ppm)\":0,\"PM1.0(ug/m3)\":16,\"PM2.5(ug/m3)\":38,\"PM10(ug/m3)\":48}"
|
||||
|
|
||||
|
if(ObjectUtil.isNotEmpty(waylineJobDTO)){ |
||||
|
jobAtmosphere.setWaylineJobId((long)waylineJobDTO.getId()); |
||||
|
jobAtmosphere.setWaylineName(waylineJobDTO.getWaylineName()); |
||||
|
jobAtmosphere.setJobId(waylineJobDTO.getJobId()); |
||||
|
jobAtmosphere.setName(waylineJobDTO.getJobName()); |
||||
|
jobAtmosphere.setFileId(waylineJobDTO.getFileId()); |
||||
|
jobAtmosphere.setWorkspaceId(waylineJobDTO.getWorkspaceId()); |
||||
|
jobAtmosphere.setDockSn(waylineJobDTO.getDockSn()); |
||||
|
// BeanUtil.copyProperties();
|
||||
|
waylineJobAtmosphereService.insertByBo(jobAtmosphere); |
||||
|
} |
||||
|
} catch (Exception e) { |
||||
|
log.error("处理灵嗅MQTT消息异常", e); |
||||
|
} |
||||
|
} |
||||
|
} |
@ -0,0 +1,22 @@ |
|||||
|
package org.dromara.sample.configuration; |
||||
|
|
||||
|
import lombok.RequiredArgsConstructor; |
||||
|
import org.springframework.boot.ApplicationArguments; |
||||
|
import org.springframework.boot.ApplicationRunner; |
||||
|
import org.springframework.boot.CommandLineRunner; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
@Component |
||||
|
@RequiredArgsConstructor |
||||
|
public class LingxiuMqttStartupRunner implements ApplicationRunner { |
||||
|
private final MqttLingxiuSubscriber subscriber; |
||||
|
|
||||
|
@Override |
||||
|
public void run(ApplicationArguments args) { |
||||
|
try { |
||||
|
subscriber.subscribe(); |
||||
|
} catch (Exception e) { |
||||
|
System.err.println("启动灵嗅 MQTT 订阅失败"+ e); |
||||
|
} |
||||
|
} |
||||
|
} |
@ -0,0 +1,45 @@ |
|||||
|
package org.dromara.sample.configuration; |
||||
|
|
||||
|
import org.eclipse.paho.client.mqttv3.MqttClient; |
||||
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
||||
|
import org.eclipse.paho.client.mqttv3.MqttException; |
||||
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
||||
|
import org.springframework.beans.factory.annotation.Value; |
||||
|
import org.springframework.context.annotation.Bean; |
||||
|
import org.springframework.context.annotation.Configuration; |
||||
|
|
||||
|
@Configuration |
||||
|
public class MqttLingxiuConfig { |
||||
|
|
||||
|
@Value("${lingxiu.mqtt.broker}") |
||||
|
private String brokerUrl; |
||||
|
|
||||
|
@Value("${lingxiu.mqtt.clientId}") |
||||
|
private String clientId; |
||||
|
|
||||
|
@Value("${lingxiu.mqtt.username}") |
||||
|
private String username; |
||||
|
|
||||
|
@Value("${lingxiu.mqtt.password}") |
||||
|
private String password; |
||||
|
|
||||
|
@Bean |
||||
|
public MqttClient lingxiuMqttClient() throws MqttException { |
||||
|
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); |
||||
|
|
||||
|
MqttConnectOptions options = new MqttConnectOptions(); |
||||
|
options.setUserName(username); |
||||
|
options.setPassword(password.toCharArray()); |
||||
|
options.setAutomaticReconnect(true); |
||||
|
options.setCleanSession(true); |
||||
|
|
||||
|
client.connect(options); |
||||
|
return client; |
||||
|
} |
||||
|
|
||||
|
@Bean |
||||
|
public MqttLingxiuSubscriber mqttLingxiuSubscriber(MqttClient client, |
||||
|
LingxiuMqttMessageProcessor processor) { |
||||
|
return new MqttLingxiuSubscriber(client, processor); |
||||
|
} |
||||
|
} |
@ -0,0 +1,45 @@ |
|||||
|
package org.dromara.sample.configuration; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.eclipse.paho.client.mqttv3.*; |
||||
|
|
||||
|
import java.nio.charset.StandardCharsets; |
||||
|
import java.util.Arrays; |
||||
|
|
||||
|
@Slf4j |
||||
|
public class MqttLingxiuSubscriber { |
||||
|
|
||||
|
private final MqttClient client; |
||||
|
private final LingxiuMqttMessageProcessor processor; |
||||
|
|
||||
|
public MqttLingxiuSubscriber(MqttClient client, LingxiuMqttMessageProcessor processor) { |
||||
|
this.client = client; |
||||
|
this.processor = processor; |
||||
|
} |
||||
|
|
||||
|
public void subscribe() throws MqttException { |
||||
|
String[] topics = new String[]{"/topic/#"}; |
||||
|
|
||||
|
client.setCallback(new MqttCallback() { |
||||
|
@Override |
||||
|
public void connectionLost(Throwable cause) { |
||||
|
log.warn("灵嗅MQTT连接断开", cause); |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void messageArrived(String topic, MqttMessage message) { |
||||
|
String payload = new String(message.getPayload(), StandardCharsets.UTF_8); |
||||
|
log.info("MQTT 收到消息:{} -> {}", topic, payload); |
||||
|
|
||||
|
processor.process(topic, payload); // 业务分发
|
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public void deliveryComplete(IMqttDeliveryToken token) {} |
||||
|
}); |
||||
|
|
||||
|
for (String topic : topics) { |
||||
|
client.subscribe(topic, 1); |
||||
|
} |
||||
|
} |
||||
|
} |
Loading…
Reference in new issue