From 5091968248b1df75d9ea3ee79fe0743b0855c572 Mon Sep 17 00:00:00 2001 From: liwenxuan <1298531568@qq.com> Date: Tue, 4 Mar 2025 10:44:47 +0800 Subject: [PATCH] =?UTF-8?q?xxl-job=20=E8=B0=83=E5=BA=A6=20demo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 6 + .../hxjt/dataupload/config/XxlJobConfig.java | 59 ++++++ .../doubleprevent/MyJobHandler.java | 45 +++++ .../jobhandler/mqtt/AlarmInfo2MqttJob.java | 123 +++++++++++++ .../mqtt/AreaPersonInfo2MqttJob.java | 118 ++++++++++++ .../jobhandler/mqtt/DeviceInfo2MqttJob.java | 79 ++++++++ .../jobhandler/mqtt/UserBaseInfo2MqttJob.java | 171 ++++++++++++++++++ .../com/hxjt/dataupload/mqtt/MqttClient.java | 4 +- src/main/resources/application-dev.yml | 21 +++ 9 files changed, 624 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/hxjt/dataupload/config/XxlJobConfig.java create mode 100644 src/main/java/com/hxjt/dataupload/jobhandler/doubleprevent/MyJobHandler.java create mode 100644 src/main/java/com/hxjt/dataupload/jobhandler/mqtt/AlarmInfo2MqttJob.java create mode 100644 src/main/java/com/hxjt/dataupload/jobhandler/mqtt/AreaPersonInfo2MqttJob.java create mode 100644 src/main/java/com/hxjt/dataupload/jobhandler/mqtt/DeviceInfo2MqttJob.java create mode 100644 src/main/java/com/hxjt/dataupload/jobhandler/mqtt/UserBaseInfo2MqttJob.java diff --git a/pom.xml b/pom.xml index f1303d3..5fc815f 100644 --- a/pom.xml +++ b/pom.xml @@ -161,6 +161,12 @@ hutool-all 5.8.22 + + + com.xuxueli + xxl-job-core + 2.4.1 + diff --git a/src/main/java/com/hxjt/dataupload/config/XxlJobConfig.java b/src/main/java/com/hxjt/dataupload/config/XxlJobConfig.java new file mode 100644 index 0000000..f23ba59 --- /dev/null +++ b/src/main/java/com/hxjt/dataupload/config/XxlJobConfig.java @@ -0,0 +1,59 @@ +package com.hxjt.dataupload.config; + +import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 小滴课堂,愿景:让技术不再难学 + * + * @Description + * @Author 二当家小D,微信:xdclass6 + * @Remark 有问题直接联系我,源码-笔记-技术交流群 + * @Version 1.0 + **/ +@Configuration +public class XxlJobConfig { + private Logger log = LoggerFactory.getLogger(XxlJobConfig.class); + + @Value("${xxl.job.admin.addresses}") + private String adminAddresses; + + @Value("${xxl.job.executor.appname}") + private String appName; + + @Value("${xxl.job.executor.ip}") + private String ip; + + @Value("${xxl.job.executor.port}") + private int port; + + @Value("${xxl.job.accessToken}") + private String accessToken; + + @Value("${xxl.job.executor.logpath}") + private String logPath; + + @Value("${xxl.job.executor.logretentiondays}") + private int logRetentionDays; + + //旧版的有bug + //@Bean(initMethod = "start", destroyMethod = "destroy") + @Bean + public XxlJobSpringExecutor xxlJobExecutor() { + log.info(">>>>>>>>>>> xxl-job config init."); + XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); + xxlJobSpringExecutor.setAdminAddresses(adminAddresses); + xxlJobSpringExecutor.setAppname(appName); + xxlJobSpringExecutor.setIp(ip); + xxlJobSpringExecutor.setPort(port); + xxlJobSpringExecutor.setAccessToken(accessToken); + xxlJobSpringExecutor.setLogPath(logPath); + xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); + + return xxlJobSpringExecutor; + } +} \ No newline at end of file diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/doubleprevent/MyJobHandler.java b/src/main/java/com/hxjt/dataupload/jobhandler/doubleprevent/MyJobHandler.java new file mode 100644 index 0000000..79d5cb0 --- /dev/null +++ b/src/main/java/com/hxjt/dataupload/jobhandler/doubleprevent/MyJobHandler.java @@ -0,0 +1,45 @@ +package com.hxjt.dataupload.jobhandler.doubleprevent; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.annotation.XxlJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + + +/** + * 小滴课堂,愿景:让技术不再难学 + * + * @Description + * @Author 二当家小D,微信:xdclass6 + * @Remark 有问题直接联系我,源码-笔记-技术交流群 + * @Version 1.0 + **/ + +@Component +public class MyJobHandler { + + private Logger log = LoggerFactory.getLogger(MyJobHandler.class); + + @XxlJob(value = "demoJobHandler",init = "init",destroy = "destroy") + public ReturnT execute(String param){ + + + log.info("小滴课堂 execute 任务触发成功:"+LocalDateTime.now()); + + + return ReturnT.SUCCESS; + } + + + private void init(){ + log.info("init 方法调用成功"); + } + + private void destroy(){ + log.info("destroy 方法调用成功"); + } + +} diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/AlarmInfo2MqttJob.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/AlarmInfo2MqttJob.java new file mode 100644 index 0000000..129aff7 --- /dev/null +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/AlarmInfo2MqttJob.java @@ -0,0 +1,123 @@ +package com.hxjt.dataupload.jobhandler.mqtt; + +import cn.hutool.json.JSONUtil; + +import com.hxjt.dataupload.enums.AlarmType; +import com.hxjt.dataupload.model.entity.rydw.AlarmInfoHx; +import com.hxjt.dataupload.model.entity.rydw.AlarmInfoIot; +import com.hxjt.dataupload.mqtt.MqttClient; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; + +import groovy.util.logging.Log4j2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import javax.annotation.Resource; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +@Component + +public class AlarmInfo2MqttJob { + + @Resource + MqttClient mqttClient; + + @Value("${spring.mqtt.deviceTopic:}") + private String deviceTopic; + + @Value("${user.token.containerId:}") + private String containerId; + + @Resource + RestTemplate restTemplate; + + @Value("${alarm.real.url:}") + private String alarmUrl; + + @Value("${alarm.real.code:}") + private String codeAlarm; + + public static final Logger log = LoggerFactory.getLogger(Log4j2.class); + + @XxlJob("alarmInfo2MqttJobHandler") + public void userBaseInfo2MqttJobHandler() throws Exception { + try { + XxlJobHelper.log("Alarm info To Mqtt Job Handler Beginning."); + + ResponseEntity forEntity = restTemplate.getForEntity(alarmUrl, Map.class); + Map body = forEntity.getBody(); + List> data = (List>)body.get("data"); + log.info(">>>>>>>>>>>>>>>>>>data="+data.size()); + data.forEach(alarmMap -> { + try { + AlarmInfoIot alarmInfoIot = new AlarmInfoIot(); + AlarmInfoHx alarmInfoHx = new AlarmInfoHx(); + Object id = alarmMap.get("id"); + alarmInfoHx.setUniqueId(id != null ? containerId + id.toString() : ""); + alarmInfoHx.setContainerId(containerId); + Object deviceNo = alarmMap.get("deviceNo"); + alarmInfoHx.setDeviceCode(deviceNo != null ? deviceNo.toString() : ""); + Object alarmType = alarmMap.get("warningType"); + String code = alarmType.toString(); + if(codeAlarm.equals(code)||"2".equals(code)||"3".equals(code)||"8".equals(code)||"15".equals(code)||"16".equals(code)) { + log.info(">>>>>>>>>>>>>>>>>>>>>>>>>alarm occur"); + if (alarmType != null) { + String contentByCode = AlarmType.getContentByCode(code); + alarmInfoHx.setAlarmType(contentByCode); + alarmInfoHx.setAlarmContent(contentByCode); + } + Object layer = alarmMap.get("factory"); + alarmInfoHx.setLayerId(layer != null ? layer.toString() : ""); + Object area = alarmMap.get("area"); + alarmInfoHx.setFirAreaId(area != null ? area.toString() : ""); + Object areaName = alarmMap.get("areaName"); + alarmInfoHx.setFirAreaName(areaName != null ? areaName.toString() : ""); + Object latitude = alarmMap.get("latitude"); + alarmInfoHx.setX(latitude != null ? new Double(latitude.toString()) : 0); + Object longitude = alarmMap.get("longitude"); + alarmInfoHx.setY(longitude != null ? new Double(longitude.toString()) : 0); + Object dateTime = alarmMap.get("warningtime"); + log.info(">>>>>>>>>>>>>>>>>>>>>>>>>dateTime="+dateTime); + if (dateTime != null) { + try { + Date date = new Date(Long.parseLong(dateTime.toString())); + SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + sf.setTimeZone(TimeZone.getTimeZone("UTC")); + String dateStr = sf.format(date); + alarmInfoHx.setDateTime(dateStr); + } catch (NumberFormatException e) { + log.info(">>>>>>>>>>>>>>>>>>>>>>>>>exception occur"+e.getMessage()); + alarmInfoHx.setDateTime(dateTime.toString()); + } + } + Object isHandle = alarmMap.get("isHandle"); + alarmInfoHx.setDealStatus(isHandle != null ? ((boolean) isHandle == false ? "0" : "1") : ""); + alarmInfoIot.setData(alarmInfoHx); + alarmInfoIot.setDataType("alarm"); + String jsonData = JSONUtil.toJsonStr(alarmInfoIot); + log.info(jsonData); + mqttClient.publish(0, false, deviceTopic, jsonData); + } + } catch (Exception ex) { + ex.printStackTrace(); + log.info(">>>>>>>>>>>>>>>>>>>>>>>>>exception occur"+ex.getMessage()); + } + }); + + } catch (Exception e) { + e.printStackTrace(); + } + log.info(">>>>>>>>>>>>>>>Alarm info mqtt push finished!"); + } + + +} diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/AreaPersonInfo2MqttJob.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/AreaPersonInfo2MqttJob.java new file mode 100644 index 0000000..2f9471b --- /dev/null +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/AreaPersonInfo2MqttJob.java @@ -0,0 +1,118 @@ +package com.hxjt.dataupload.jobhandler.mqtt; + +import cn.hutool.json.JSONUtil; +import com.hxjt.dataupload.enums.UserType; +import com.hxjt.dataupload.model.entity.rydw.AreaUserInfoHx; +import com.hxjt.dataupload.model.entity.rydw.AreaUserInfoIot; +import com.hxjt.dataupload.model.entity.rydw.UserBaseInfo; + +import com.hxjt.dataupload.mqtt.MqttClient; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import groovy.util.logging.Log4j2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Component +public class AreaPersonInfo2MqttJob { + + @Resource + MqttClient mqttClient; + + @Value("${spring.mqtt.areaTopic:}") + private String areaTopic; + + @Value("${user.token.containerId:}") + private String containerId; + + @Resource + RestTemplate restTemplate; + + @Value("${area.data.url:}") + private String areaPersonUrl; + + public static final Logger log = LoggerFactory.getLogger(Log4j2.class); + + @XxlJob("areaInfo2MqttJobHandler") + public void userBaseInfo2MqttJobHandler() throws Exception { + try { + XxlJobHelper.log("Area person info To Mqtt Job Handler Beginning."); + + ResponseEntity forEntity = restTemplate.getForEntity(areaPersonUrl, Map.class); + Map body = forEntity.getBody(); + List> data = (List>)body.get("data"); + log.info(">>>>>>>>>>>>>>>>>>>>>>area data size:"+data.size()); + data.forEach(alarmMap -> { + try { + AreaUserInfoIot areaUserInfoIot = new AreaUserInfoIot(); + AreaUserInfoHx areaUserInfoHx = new AreaUserInfoHx(); + Object areaId = alarmMap.get("areaId"); + areaUserInfoHx.setUniqueId(containerId + areaId); + areaUserInfoHx.setContainerId(containerId); + areaUserInfoHx.setOrgCode("91370921328482832M"); + areaUserInfoHx.setOrgName("山东恒信高科能源有限公司"); + areaUserInfoHx.setAreaId(areaId != null ? areaId.toString() : ""); + Object areaName = alarmMap.get("areaName"); + areaUserInfoHx.setAreaName(areaName != null ? areaName.toString() : ""); + Object upperLimit = alarmMap.get("upperLimit"); + areaUserInfoHx.setAreaPid(upperLimit != null ? upperLimit.toString() : ""); + Object workLevel = alarmMap.get("workLevel"); + areaUserInfoHx.setAreaPname(workLevel != null ? workLevel.toString() : ""); + List> personList = (List>)alarmMap.get("personList"); + log.info(">>>>>>>>>>>>>>>>>>>>>>person data size:"+personList.size()); + List users = new ArrayList<>(); + if(personList != null && personList.size()>0){ + areaUserInfoHx.setPersonNum(personList.size() + ""); + personList.forEach(p -> { + try { + UserBaseInfo userBaseInfo = new UserBaseInfo(); + Object empNo = p.get("empNo"); + userBaseInfo.setUserCode(empNo != null ? empNo.toString() : ""); + Object empName = p.get("empName"); + userBaseInfo.setUserName(empName != null ? empName.toString() : ""); + Object deviceNo = p.get("deviceNo"); + userBaseInfo.setCardId(deviceNo != null ? deviceNo.toString() : ""); + userBaseInfo.setUserOrgCode("91370921328482832M"); + userBaseInfo.setUserOrgName("山东恒信高科能源有限公司"); + Object specifictype = p.get("specifictype"); + String contentByCode = UserType.getContentByCode(specifictype != null ? specifictype.toString() : ""); + userBaseInfo.setUserType(contentByCode != null ? contentByCode : ""); + users.add(userBaseInfo); + } catch (Exception e) { + log.error(">>>>>>>>>>>>>>>>>>>>>>occur exception!"+e.getMessage()); + } + }); + } else { + areaUserInfoHx.setPersonNum("0"); + } + areaUserInfoHx.setPersonList(users); + areaUserInfoIot.setData(areaUserInfoHx); + areaUserInfoIot.setDataType("areaPersonAlarm"); + String jsonData = JSONUtil.toJsonStr(areaUserInfoIot); + log.info(jsonData); + mqttClient.publish(0, false,areaTopic, jsonData); + } catch (Exception e) { + e.printStackTrace(); + log.error(">>>>>>>>>>>>>>>>>>>>>>occur exception!"+e.getMessage()); + } + }); + + } catch (Exception e) { + e.printStackTrace(); + log.info(">>>>>>>>>>>>>>>>>>>>>>occur exception!"+e.getMessage()); + } + log.info(">>>>>>>>>>>>>>>Area person info mqtt push finished!"); + } + + +} diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/DeviceInfo2MqttJob.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/DeviceInfo2MqttJob.java new file mode 100644 index 0000000..d1c432c --- /dev/null +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/DeviceInfo2MqttJob.java @@ -0,0 +1,79 @@ +package com.hxjt.dataupload.jobhandler.mqtt; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.json.JSONUtil; + +import com.hxjt.dataupload.model.entity.rydw.DeviceInfoHx; +import com.hxjt.dataupload.model.entity.rydw.DeviceInfoIot; +import com.hxjt.dataupload.mqtt.MqttClient; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import groovy.util.logging.Log4j2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; + +@Component + +public class DeviceInfo2MqttJob { + + @Resource + MqttClient mqttClient; + + @Value("${spring.mqtt.deviceTopic:}") + private String deviceTopic; + + @Value("${user.token.containerId:}") + private String containerId; + + @Resource + RestTemplate restTemplate; + + @Value("${device.real.url:}") + private String deviceUrl; + + public static final Logger log = LoggerFactory.getLogger(Log4j2.class); + + @XxlJob("deviceInfo2MqttJobHandler") + public void userBaseInfo2MqttJobHandler() throws Exception { + try { + XxlJobHelper.log(" Device base info To Mqtt Job Handler Beginning."); + + ResponseEntity forEntity = restTemplate.getForEntity(deviceUrl, Map.class); + Map body = forEntity.getBody(); + List> data = (List>)body.get("data"); + data.forEach(deviceMap -> { + DeviceInfoIot deviceInfoIot = new DeviceInfoIot(); + DeviceInfoHx deviceInfo = new DeviceInfoHx(); + deviceInfo.setContainerId(containerId); + Object deviceNo = deviceMap.get("deviceNo"); + deviceInfo.setDeviceCode(deviceNo != null ? deviceNo.toString() : ""); + Object layer = deviceMap.get("layer"); + deviceInfo.setLayerId(layer != null ? layer.toString() : ""); + Object crossX = deviceMap.get("longitude"); + deviceInfo.setX(crossX != null ? new Double(crossX.toString()) : 0.0); + Object crossY = deviceMap.get("latitude"); + deviceInfo.setY(crossY != null ? new Double(crossY.toString()) : 0.0); + Object dateTime = deviceMap.get("dateTime"); + deviceInfo.setDateTime(dateTime != null ? dateTime.toString() : ""); + String str = IdUtil.getSnowflakeNextIdStr(); + deviceInfoIot.setData(deviceInfo); + deviceInfoIot.setDataType("position"); + String jsonData = JSONUtil.toJsonStr(deviceInfoIot); + log.info(jsonData); + mqttClient.publish(0, false, deviceTopic, jsonData); + }); + + } catch (Exception e) { + e.printStackTrace(); + } + log.info(">>>>>>>>>>>>>>>Device base info mqtt push finished!"); + } +} diff --git a/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/UserBaseInfo2MqttJob.java b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/UserBaseInfo2MqttJob.java new file mode 100644 index 0000000..6dd53f0 --- /dev/null +++ b/src/main/java/com/hxjt/dataupload/jobhandler/mqtt/UserBaseInfo2MqttJob.java @@ -0,0 +1,171 @@ +package com.hxjt.dataupload.jobhandler.mqtt; + +import cn.hutool.crypto.SecureUtil; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; + +import com.hxjt.dataupload.model.entity.rydw.UserBaseInfo; +import com.hxjt.dataupload.model.entity.rydw.UserBaseInfoIot; +import com.hxjt.dataupload.mqtt.MqttClient; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import groovy.util.logging.Log4j2; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component + +public class UserBaseInfo2MqttJob { + + @Resource + MqttClient mqttClient; + + @Value("${spring.mqtt.realTimeTopic:}") + private String realTimeTopic; + + @Value("${user.token.url:}") + private String tokenUrl; + + @Value("${user.token.username:}") + private String username; + + @Value("${user.token.password:}") + private String password; + + @Value("${user.token.orgin:}") + private String origin; + + @Value("${user.token.verCode:}") + private String verCode; + + @Value("${user.token.urlPerson:}") + private String urlPerson; + + @Value("${user.token.containerId:}") + private String containerId; + + public static final Logger log = LoggerFactory.getLogger(Log4j2.class); + + @XxlJob("userBaseInfo2MqttJobHandler") + public void userBaseInfo2MqttJobHandler() throws Exception { + try { + XxlJobHelper.log(" User base info To Mqtt Job Handler Beginning."); + + // 创建httpclient对象 + CloseableHttpClient httpClient = HttpClients.createDefault(); + + //创建请求对象 + HttpPost httpPost = new HttpPost(tokenUrl); + + JSONObject jsonObj = new JSONObject(); + jsonObj.putOnce("username", username); + jsonObj.putOnce("password", password); + + StringEntity entity = new StringEntity(jsonObj.toString()); + //指定请求编码方式 + entity.setContentEncoding("utf-8"); + //数据格式 + entity.setContentType("application/json"); + httpPost.setEntity(entity); + httpPost.setHeader("Origin", origin); + httpPost.setHeader("User-Agent", "sdhxgk"); + + //发送请求 + CloseableHttpResponse response = httpClient.execute(httpPost); + //解析返回结果 + int statusCode = response.getStatusLine().getStatusCode(); + log.info("-----------查询token响应码为:" + statusCode); + if (200 == statusCode) { + org.apache.http.HttpEntity entityRes = response.getEntity(); + String body = EntityUtils.toString(entityRes); + + log.info("-----------查询token响应数据为:" + statusCode); + + JSONObject jsonObject = JSONUtil.parseObj(body); + JSONObject data = JSONUtil.parseObj(jsonObject.get("data")); + String token = data.get("token").toString(); + System.out.println(token); + String number = data.get("number").toString(); + System.out.println(number); + String secNumber = SecureUtil.md5(number); + String secverCode = SecureUtil.md5(verCode); + String twoSec = SecureUtil.md5(secNumber + secverCode); + String threeSec = SecureUtil.md5(twoSec + verCode); + + HttpPost httpPostPerson = new HttpPost(urlPerson); + + + StringEntity entityEmpty = new StringEntity(""); + //指定请求编码方式 + entityEmpty.setContentEncoding("utf-8"); + //数据格式 + entityEmpty.setContentType("application/json"); + httpPostPerson.setEntity(entityEmpty); + httpPostPerson.setHeader("Origin", origin); + httpPostPerson.setHeader("User-Agent", "sdhxgk"); + httpPostPerson.setHeader("token", token); + httpPostPerson.setHeader("number", threeSec); + + //发送请求 + CloseableHttpResponse resPerson = httpClient.execute(httpPostPerson); + //解析返回结果 + int staPersonCode = resPerson.getStatusLine().getStatusCode(); + log.info("-----------查询人员响应码为:" + staPersonCode); + + if (200 == staPersonCode) { + org.apache.http.HttpEntity entPersonRes = resPerson.getEntity(); + String personBody = EntityUtils.toString(entPersonRes); + log.info("-----------查询人员响应数据为:" + personBody); + resPerson.close(); + JSONObject jsonObjPerson = JSONUtil.parseObj(personBody); + System.out.println(jsonObjPerson); + + JSONArray parseArray = JSONUtil.parseArray(jsonObjPerson.get("data")); + parseArray.forEach(obj -> { + JSONObject person = JSONUtil.parseObj(obj); + Object deviceNo = person.get("cardld"); + Object personName = person.get("empName"); + Object empNo = person.get("userCode"); + Object specifictype = person.get("userType"); + Object workUnit = person.get("userOrgName"); + Object userPhone = person.get("userPhone"); + Object workUnitCode = person.get("userOrgCode"); + + UserBaseInfoIot userBaseInfoIot = new UserBaseInfoIot(); + UserBaseInfo userBaseInfo = new UserBaseInfo(); + userBaseInfo.setContainerId(containerId); + userBaseInfo.setCardId(deviceNo != null ? deviceNo.toString() : null); + userBaseInfo.setUserPhone(userPhone != null ? userPhone.toString() : null); + userBaseInfo.setUserName(personName != null ? personName.toString() : null); + userBaseInfo.setUserCode(empNo != null ? empNo.toString() : null); + userBaseInfo.setUserType(specifictype != null ? specifictype.toString() : null); + userBaseInfo.setUserOrgName(workUnit != null ? workUnit.toString() : null); + userBaseInfo.setUserOrgCode(workUnitCode != null ? workUnitCode.toString() : null); + userBaseInfoIot.setData(userBaseInfo); + userBaseInfoIot.setDataType("baseInfo"); + String jsonData = JSONUtil.toJsonStr(userBaseInfoIot); + mqttClient.publish(0, false, realTimeTopic, jsonData); + }); + } + } + //关闭资源 + response.close(); + httpClient.close(); + } catch (Exception e) { + e.printStackTrace(); + } + log.info(">>>>>>>>>>>>>>>mqtt push finished!"); + } +} diff --git a/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java b/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java index bbdd249..b7ce3bf 100644 --- a/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java +++ b/src/main/java/com/hxjt/dataupload/mqtt/MqttClient.java @@ -11,7 +11,7 @@ import org.springframework.stereotype.Component; @Component public class MqttClient { private static final Logger logger = LoggerFactory.getLogger(com.hxjt.dataupload.mqtt.MqttClient.class); - + private MqttCallback pushCallback; @Autowired public void setPushCallback(MqttCallback pushCallback) { @@ -42,7 +42,7 @@ public class MqttClient { options.setPassword(password.toCharArray()); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); - com.hxjt.dataupload.mqtt.MqttClient.setClient(client); + com.hxjt.dataupload.mqtt.MqttClient.setClient(client);// try { //设置消息到达的回调 client.setCallback(pushCallback); diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 592724d..6463d22 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -50,3 +50,24 @@ spring: primary: syncplant #设置严格模式,默认false不启动. 启动后在未匹配到指定数据源时候回抛出异常,不启动会使用默认数据源. strict: false +xxl: + job: + admin: + #调度中心部署地址,多个配置逗号分隔 "http://address01,http://address02" + addresses: http://127.0.0.1:8080/xxl-job-admin + #执行器token,非空时启用 xxl-job, access token + accessToken: data_upload + executor: + # 执行器app名称,和控制台那边配置一样的名称,不然注册不上去 + appname: data-upload + # [选填]执行器注册:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。 + #从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。 + address: + #[选填]执行器IP :默认为空表示自动获取IP(即springboot容器的ip和端口,可以自动获取,也可以指定),多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务", + ip: + # [选填]执行器端口号:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口; + port: 9999 + #执行器日志文件存储路径,需要对该路径拥有读写权限;为空则使用默认路径 + logpath: ./data/logs/xxl-job/executor + #执行器日志保存天数 + logretentiondays: 30