diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/RealtimeDataRequestObj.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/RealtimeDataRequestObj.java new file mode 100644 index 0000000..419a8d9 --- /dev/null +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/RealtimeDataRequestObj.java @@ -0,0 +1,61 @@ +package com.hxjt.dataupload.jobhandler.mqtt.job.user.domain; + +public class RealtimeDataRequestObj { + + private String code; + private Long time; + private String value; + private String extand; + private String type; + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public Long getTime() { + return time; + } + + public void setTime(Long time) { + this.time = time; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public String getExtand() { + return extand; + } + + public void setExtand(String extand) { + this.extand = extand; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @Override + public String toString() { + return "RealtimeDataRequestObj{" + + "code='" + code + '\'' + + ", time=" + time + + ", value='" + value + '\'' + + ", extand='" + extand + '\'' + + ", type='" + type + '\'' + + '}'; + } +} diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/RealtimeDataTopicMqttJobHandler.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/RealtimeDataTopicMqttJobHandler.java new file mode 100644 index 0000000..dca7f1e --- /dev/null +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/RealtimeDataTopicMqttJobHandler.java @@ -0,0 +1,98 @@ +package com.hxjt.dataupload.jobhandler.mqtt.job.user.jobhandler; + + +import com.alibaba.fastjson2.JSON; +import com.hxjt.dataupload.jobhandler.mqtt.job.user.domain.ApiResponse; +import com.hxjt.dataupload.jobhandler.mqtt.job.user.domain.RealtimeDataRequestObj; +import com.hxjt.dataupload.jobhandler.mqtt.job.user.domain.SensorDataRequestObj; +import com.hxjt.dataupload.mqtt.MqttClient; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import javax.annotation.Resource; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.hxjt.dataupload.utils.ExcelDataExtraction.extractData; + +/* +* 测点实时信息推送 +* */ +@Component +public class RealtimeDataTopicMqttJobHandler { + + private Logger log = LoggerFactory.getLogger(RealtimeDataTopicMqttJobHandler.class); + @Resource + MqttClient mqttClient; + + @Value("${xxl.mqtt.realtimeDataTopic:}") + private String realtimeDataTopic; + + @Resource + RestTemplate restTemplate; + + + @XxlJob("RealtimeDataTopicMqttJobHandler") + public void execute(){ + + XxlJobHelper.log("RealtimeDataTopicMqttJobHandler Beginning."); + + String filePath = "F://Users/liwenxuan/Desktop/环保上传园区/数据位号2025.3.4-1.xlsx"; // 请替换为实际的Excel文件路径 + List dataList = extractData(filePath); + ArrayList sArr = new ArrayList<>() ; + dataList.forEach(id -> { + if(id.equals("DK$SO2_ZS_PV")||id.equals("DK$DLGL_GL_GL_DUST_ZS_PV")||id.equals("DK$DLGL_NOX_ZS_PV")||id.equals("DK$DLGL_SFLOW_PV")||id.equals("DK$DLGL_O2_PV")||id.equals("DK$TEMP_PV")||id.equals("DK$DLGL_HUM_PV")||id.equals("DK$DLGL_VELOCITY_PV")||id.equals("DK$DLGL_PRESS_PV")){ + //System.out.println(id); + //请求地址 + String url = "http://172.20.5.120:29912/open/app/api/po/send/SD5UvGh2BGxEaaTh76ybq3Ml1ClvF8j6?id={id}"; + + //请求参数 + Map uriVariables = new HashMap<>(); + uriVariables.put("id", id); + + + //发起请求,直接返回对象(带参数请求) + + ApiResponse apiResponse = restTemplate.getForObject(url, ApiResponse.class, uriVariables); + Map recordMap = apiResponse.getData().getRecords().get(0); + System.out.println(apiResponse.getData().getRecords().get(0)); + RealtimeDataRequestObj obj = new RealtimeDataRequestObj(); + + + obj.setCode((String) recordMap.get("code")); + long timestamp = Instant.now().toEpochMilli(); // 毫秒级 + obj.setTime(timestamp); + obj.setValue((String)recordMap.get("value")); + obj.setType((String) recordMap.get("kpiType")); + obj.setExtand(""); + //System.out.println(sensorDataRequestObj); + sArr.add(obj); + } + }); + + String json = JSON.toJSONString(sArr); + System.out.println(json); + XxlJobHelper.log("mqtt push data : : " + json); + + + mqttClient.publish(0, false, realtimeDataTopic, json); + XxlJobHelper.log(">>>>>>>>>>>>>>>Alarm info mqtt push finished!"); + } + + private void init() { + log.info("init 方法调用成功"); + } + + private void destroy() { + log.info("destroy 方法调用成功"); + } + +} diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java index eeac467..c434f74 100644 --- a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java @@ -29,12 +29,8 @@ import java.util.*; import static com.hxjt.dataupload.utils.ExcelDataExtraction.extractData; /* - * @description: 报警信息推送 - * @author: ZhangRY - * @date: 2025/4/9 10:51 - * @param: null - * @return: null - **/ +* 测点基本信息推送 +* */ @Component public class SensorDataTopicMqttJobHandler { diff --git a/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java b/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java index 9c2d0c1..008e565 100644 --- a/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java +++ b/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java @@ -52,7 +52,7 @@ public class MqttClient { client.setCallback(pushCallback); //连接到服务器 client.connect(options); - System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&"); + System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&---连接成功---&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&"); } catch (Exception e) { e.printStackTrace(); } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index a71da85..359b30e 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -88,8 +88,8 @@ xxl: mqtt: username: 10@ningyanghuagongchany20250422172632 password: 9fabff0fe3d049af92329e29dd9af7e4 - #hostUrl: tcp://112.245.55.112:7379 - hostUrl: tcp://172.20.2.57:1883 + hostUrl: tcp://112.245.55.112:18183 + #hostUrl: tcp://172.20.2.57:1883 clientId: 10@ningyanghuagongchany20250422172632 sensorDataTopic: /iot/10@ningyanghuagongchany20250422172632/thirdParty/sensorData timeout: 100