diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/DataPage.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/DataPage.java index b8de8bd..f19df6a 100644 --- a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/DataPage.java +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/DataPage.java @@ -10,7 +10,7 @@ public class DataPage { private Integer total; private Integer pages; private Integer size; - private List records; + private List> records; public Integer getCurrent() { return current; @@ -44,11 +44,11 @@ public class DataPage { this.size = size; } - public List getRecords() { + public List> getRecords() { return records; } - public void setRecords(List records) { + public void setRecords(List> records) { this.records = records; } diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/SensorDataRequestObj.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/SensorDataRequestObj.java index e03484c..d88a254 100644 --- a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/SensorDataRequestObj.java +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/SensorDataRequestObj.java @@ -93,4 +93,17 @@ public class SensorDataRequestObj { public void setUnit(String unit) { this.unit = unit; } + + @Override + public String toString() { + return "SensorDataRequestObj{" + + "descript='" + descript + '\'' + + ", name='" + name + '\'' + + ", code='" + code + '\'' + + ", max='" + max + '\'' + + ", min='" + min + '\'' + + ", type='" + type + '\'' + + ", unit='" + unit + '\'' + + '}'; + } } diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java index 14e3b40..eeac467 100644 --- a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java @@ -5,8 +5,10 @@ package com.hxjt.dataupload.jobhandler.mqtt.job.user.jobhandler; import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.hxjt.dataupload.jobhandler.mqtt.job.user.domain.ApiResponse; +import com.hxjt.dataupload.jobhandler.mqtt.job.user.domain.SensorDataRequestObj; import com.hxjt.dataupload.mqtt.MqttClient; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; @@ -21,6 +23,7 @@ import org.springframework.web.client.RestTemplate; import javax.annotation.Resource; +import java.lang.reflect.Array; import java.util.*; import static com.hxjt.dataupload.utils.ExcelDataExtraction.extractData; @@ -39,7 +42,7 @@ public class SensorDataTopicMqttJobHandler { @Resource MqttClient mqttClient; - @Value("${spring.mqtt.sensorDataTopic:}") + @Value("${xxl.mqtt.sensorDataTopic:}") private String sensorDataTopic; @Resource @@ -53,9 +56,10 @@ public class SensorDataTopicMqttJobHandler { String filePath = "F://Users/liwenxuan/Desktop/环保上传园区/数据位号2025.3.4-1.xlsx"; // 请替换为实际的Excel文件路径 List dataList = extractData(filePath); + ArrayList sArr = new ArrayList<>() ; dataList.forEach(id -> { if(id.equals("DK$SO2_ZS_PV")||id.equals("DK$DLGL_GL_GL_DUST_ZS_PV")||id.equals("DK$DLGL_NOX_ZS_PV")||id.equals("DK$DLGL_SFLOW_PV")||id.equals("DK$DLGL_O2_PV")||id.equals("DK$TEMP_PV")||id.equals("DK$DLGL_HUM_PV")||id.equals("DK$DLGL_VELOCITY_PV")||id.equals("DK$DLGL_PRESS_PV")){ - System.out.println(id); + //System.out.println(id); //请求地址 String url = "http://172.20.5.120:29912/open/app/api/po/send/MkZSTLpZ7vHYcz2clyDMBuUnuqHX9A49?id={id}"; @@ -67,16 +71,29 @@ public class SensorDataTopicMqttJobHandler { //发起请求,直接返回对象(带参数请求) ApiResponse apiResponse = restTemplate.getForObject(url, ApiResponse.class, uriVariables); - HashMap recordMap = apiResponse.getData().getRecords().get(0); - System.out.println(apiResponse.getData().getRecords().get(0)); + Map recordMap = apiResponse.getData().getRecords().get(0); + //System.out.println(apiResponse.getData().getRecords().get(0)); + SensorDataRequestObj sensorDataRequestObj = new SensorDataRequestObj(); + sensorDataRequestObj.setDescript((String) recordMap.get("name")); + sensorDataRequestObj.setName((String) recordMap.get("name")); + sensorDataRequestObj.setCode((String) recordMap.get("id")); + sensorDataRequestObj.setType((String) recordMap.get("kpiTypeBak")); + sensorDataRequestObj.setMax((String)recordMap.get("heighLimit")); + sensorDataRequestObj.setMin((String)recordMap.get("lowLimit")); + //System.out.println(sensorDataRequestObj); + sArr.add(sensorDataRequestObj); } }); /*String jsonData = JSONUtil.toJsonStr(alarmInfoIot); log.info(jsonData); XxlJobHelper.log("mqtt push data : : " + jsonData); mqttClient.publish(0, false, deviceTopic, jsonData);*/ + String json = JSON.toJSONString(sArr); + System.out.println(json); + XxlJobHelper.log("mqtt push data : : " + json); + System.out.println(mqttClient); - + mqttClient.publish(0, false, sensorDataTopic, json); XxlJobHelper.log(">>>>>>>>>>>>>>>Alarm info mqtt push finished!"); } diff --git a/src/main/java/com/hxjt/dataupload/mqtt/MqttCallback.java b/src/main/java/com/hxjt/dataupload/mqtt/MqttCallback.java index 0c483d5..a554cae 100644 --- a/src/main/java/com/hxjt/dataupload/mqtt/MqttCallback.java +++ b/src/main/java/com/hxjt/dataupload/mqtt/MqttCallback.java @@ -23,11 +23,11 @@ public class MqttCallback implements org.eclipse.paho.client.mqttv3.MqttCallback @Override public void connectionLost(Throwable throwable) { - /*// 连接丢失后,一般在这里面进行重连 + // 连接丢失后,一般在这里面进行重连 logger.info("连接断开,可以做重连"); if (client == null || !client.isConnected()) { mqttConfig.getMqttPushClient(); - }*/ + } } @Override diff --git a/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java b/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java index 7b9d0c6..9c2d0c1 100644 --- a/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java +++ b/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java @@ -52,6 +52,7 @@ public class MqttClient { client.setCallback(pushCallback); //连接到服务器 client.connect(options); + System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&"); } catch (Exception e) { e.printStackTrace(); } @@ -70,7 +71,8 @@ public class MqttClient { message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); - MqttTopic mTopic = MqttClient.getClient().getTopic(topic); + org.eclipse.paho.client.mqttv3.MqttClient client = MqttClient.getClient(); + MqttTopic mTopic = client.getTopic(topic); if (null == mTopic) { logger.error("topic not exist"); } diff --git a/src/main/java/com/hxjt/dataupload/mqtt/MqttConfig.java b/src/main/java/com/hxjt/dataupload/mqtt/MqttConfig.java index 8f04256..39f0b4f 100644 --- a/src/main/java/com/hxjt/dataupload/mqtt/MqttConfig.java +++ b/src/main/java/com/hxjt/dataupload/mqtt/MqttConfig.java @@ -7,7 +7,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component -@ConfigurationProperties("spring.mqtt") +@ConfigurationProperties("xxl.mqtt") @Data public class MqttConfig { @Autowired diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 7a0d0aa..a71da85 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -88,7 +88,8 @@ xxl: mqtt: username: 10@ningyanghuagongchany20250422172632 password: 9fabff0fe3d049af92329e29dd9af7e4 - hostUrl: tcp://112.245.55.112:7379 + #hostUrl: tcp://112.245.55.112:7379 + hostUrl: tcp://172.20.2.57:1883 clientId: 10@ningyanghuagongchany20250422172632 sensorDataTopic: /iot/10@ningyanghuagongchany20250422172632/thirdParty/sensorData timeout: 100