Browse Source

环保点位传感器实时数据推送

master
liwenxuan 7 months ago
parent
commit
a473bae690
  1. 61
      src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/RealtimeDataRequestObj.java
  2. 98
      src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/RealtimeDataTopicMqttJobHandler.java
  3. 8
      src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java
  4. 2
      src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java
  5. 4
      src/main/resources/application-dev.yml

61
src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/domain/RealtimeDataRequestObj.java

@ -0,0 +1,61 @@
package com.hxjt.dataupload.jobhandler.mqtt.job.user.domain;
public class RealtimeDataRequestObj {
private String code;
private Long time;
private String value;
private String extand;
private String type;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getExtand() {
return extand;
}
public void setExtand(String extand) {
this.extand = extand;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "RealtimeDataRequestObj{" +
"code='" + code + '\'' +
", time=" + time +
", value='" + value + '\'' +
", extand='" + extand + '\'' +
", type='" + type + '\'' +
'}';
}
}

98
src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/RealtimeDataTopicMqttJobHandler.java

@ -0,0 +1,98 @@
package com.hxjt.dataupload.jobhandler.mqtt.job.user.jobhandler;
import com.alibaba.fastjson2.JSON;
import com.hxjt.dataupload.jobhandler.mqtt.job.user.domain.ApiResponse;
import com.hxjt.dataupload.jobhandler.mqtt.job.user.domain.RealtimeDataRequestObj;
import com.hxjt.dataupload.jobhandler.mqtt.job.user.domain.SensorDataRequestObj;
import com.hxjt.dataupload.mqtt.MqttClient;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.hxjt.dataupload.utils.ExcelDataExtraction.extractData;
/*
* 测点实时信息推送
* */
@Component
public class RealtimeDataTopicMqttJobHandler {
private Logger log = LoggerFactory.getLogger(RealtimeDataTopicMqttJobHandler.class);
@Resource
MqttClient mqttClient;
@Value("${xxl.mqtt.realtimeDataTopic:}")
private String realtimeDataTopic;
@Resource
RestTemplate restTemplate;
@XxlJob("RealtimeDataTopicMqttJobHandler")
public void execute(){
XxlJobHelper.log("RealtimeDataTopicMqttJobHandler Beginning.");
String filePath = "F://Users/liwenxuan/Desktop/环保上传园区/数据位号2025.3.4-1.xlsx"; // 请替换为实际的Excel文件路径
List<String> dataList = extractData(filePath);
ArrayList<RealtimeDataRequestObj> sArr = new ArrayList<>() ;
dataList.forEach(id -> {
if(id.equals("DK$SO2_ZS_PV")||id.equals("DK$DLGL_GL_GL_DUST_ZS_PV")||id.equals("DK$DLGL_NOX_ZS_PV")||id.equals("DK$DLGL_SFLOW_PV")||id.equals("DK$DLGL_O2_PV")||id.equals("DK$TEMP_PV")||id.equals("DK$DLGL_HUM_PV")||id.equals("DK$DLGL_VELOCITY_PV")||id.equals("DK$DLGL_PRESS_PV")){
//System.out.println(id);
//请求地址
String url = "http://172.20.5.120:29912/open/app/api/po/send/SD5UvGh2BGxEaaTh76ybq3Ml1ClvF8j6?id={id}";
//请求参数
Map<String, String> uriVariables = new HashMap<>();
uriVariables.put("id", id);
//发起请求,直接返回对象(带参数请求)
ApiResponse apiResponse = restTemplate.getForObject(url, ApiResponse.class, uriVariables);
Map recordMap = apiResponse.getData().getRecords().get(0);
System.out.println(apiResponse.getData().getRecords().get(0));
RealtimeDataRequestObj obj = new RealtimeDataRequestObj();
obj.setCode((String) recordMap.get("code"));
long timestamp = Instant.now().toEpochMilli(); // 毫秒级
obj.setTime(timestamp);
obj.setValue((String)recordMap.get("value"));
obj.setType((String) recordMap.get("kpiType"));
obj.setExtand("");
//System.out.println(sensorDataRequestObj);
sArr.add(obj);
}
});
String json = JSON.toJSONString(sArr);
System.out.println(json);
XxlJobHelper.log("mqtt push data : : " + json);
mqttClient.publish(0, false, realtimeDataTopic, json);
XxlJobHelper.log(">>>>>>>>>>>>>>>Alarm info mqtt push finished!");
}
private void init() {
log.info("init 方法调用成功");
}
private void destroy() {
log.info("destroy 方法调用成功");
}
}

8
src/main/java/com/hxjt/dataupload/jobhandler/mqtt/job/user/jobhandler/SensorDataTopicMqttJobHandler.java

@ -29,12 +29,8 @@ import java.util.*;
import static com.hxjt.dataupload.utils.ExcelDataExtraction.extractData;
/*
* @description: 报警信息推送
* @author: ZhangRY
* @date: 2025/4/9 10:51
* @param: null
* @return: null
**/
* 测点基本信息推送
* */
@Component
public class SensorDataTopicMqttJobHandler {

2
src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java

@ -52,7 +52,7 @@ public class MqttClient {
client.setCallback(pushCallback);
//连接到服务器
client.connect(options);
System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
System.out.println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&---连接成功---&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
} catch (Exception e) {
e.printStackTrace();
}

4
src/main/resources/application-dev.yml

@ -88,8 +88,8 @@ xxl:
mqtt:
username: 10@ningyanghuagongchany20250422172632
password: 9fabff0fe3d049af92329e29dd9af7e4
#hostUrl: tcp://112.245.55.112:7379
hostUrl: tcp://172.20.2.57:1883
hostUrl: tcp://112.245.55.112:18183
#hostUrl: tcp://172.20.2.57:1883
clientId: 10@ningyanghuagongchany20250422172632
sensorDataTopic: /iot/10@ningyanghuagongchany20250422172632/thirdParty/sensorData
timeout: 100

Loading…
Cancel
Save