丁治程 2 місяців тому
батько
коміт
2a5de6462c

+ 40 - 9
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/component/MqttPushClient.java

@@ -1,12 +1,17 @@
 package org.jeecg.modules.mqtt.component;
 
+import com.alibaba.fastjson.JSONObject;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
+import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
+import org.jeecg.modules.iotedgeCollectData.service.impl.IotedgeCollectDataServiceImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -81,15 +86,7 @@ public class MqttPushClient {
                         logger.info("接收消息主题 : " + topic);
                         logger.info("接收消息Qos : " + mqttMessage.getQos());
                         logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
-
-                //            JSONObject jsonObject = deviceDataService.analysisMessage(new String(mqttMessage.getPayload()));
-                        Map<String, Object> map = new HashMap<>();
-                //            map.put("map", jsonObject);
-                //            com.example.emqtt.service.WebsocketServer.sendInfo(map);
-                //            System.out.println(map);
-                //        }catch (Exception ex){
-                //            System.out.println(ex.getMessage());
-                //        }
+                        saveDataInfo(new String(mqttMessage.getPayload()));
                     }
 
                     @Override
@@ -169,5 +166,39 @@ public class MqttPushClient {
             e.printStackTrace();
         }
     }
+
+
+
+    private void saveDataInfo(String info) {
+        ArrayList<IotedgeCollectData> list = new ArrayList<>();
+        JSONObject jsonObject = JSONObject.parseObject(info);
+        Map<String,Object> services = (Map<String, Object>) jsonObject.get("services");
+        for (String key:services.keySet()) {
+            Map<String,Object> tag = (Map<String,Object>) services.get(key);
+            Map<String,Object> tagmap = (Map<String, Object>) tag.get("properties");
+            for (String item:tagmap.keySet()) {
+                Map<String,Object> valuemap = (Map<String, Object>) tagmap.get(item);
+                String value1;
+                if (valuemap.get("value") instanceof Integer){
+                    value1 = String.valueOf(valuemap.get("value"));
+                }else {
+                    value1 =  (String) valuemap.get("value");
+                }
+
+                IotedgeCollectData iotedgeCollectData = new IotedgeCollectData();
+                iotedgeCollectData.setGroupid(jsonObject.get("g_id").toString());
+                iotedgeCollectData.setPid(jsonObject.get("p_id").toString());
+                iotedgeCollectData.setDevice(jsonObject.get("d_id").toString());
+                iotedgeCollectData.setTime(jsonObject.get("ts").toString());
+                iotedgeCollectData.setService(key);
+                iotedgeCollectData.setProperty(item);
+                iotedgeCollectData.setValue(value1);
+
+                list.add(iotedgeCollectData);
+            }
+        }
+        IIotedgeCollectDataService dataService = new IotedgeCollectDataServiceImpl();
+        dataService.saveBatch(list);
+    }
 }
 

+ 38 - 10
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/component/PushCallback.java

@@ -1,15 +1,20 @@
 package org.jeecg.modules.mqtt.component;
 
+import com.alibaba.fastjson.JSONObject;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
+import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
+import org.jeecg.modules.iotedgeCollectData.service.impl.IotedgeCollectDataServiceImpl;
 import org.jeecg.modules.mqtt.config.MqttConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -42,24 +47,47 @@ public class PushCallback implements MqttCallback {
 
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
-//        try {
-        // subscribe后得到的消息会执行到这里面
         logger.info("接收消息主题 : " + topic);
         logger.info("接收消息Qos : " + mqttMessage.getQos());
         logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
-//            JSONObject jsonObject = deviceDataService.analysisMessage(new String(mqttMessage.getPayload()));
-        Map<String, Object> map = new HashMap<>();
-//            map.put("map", jsonObject);
-//            com.example.emqtt.service.WebsocketServer.sendInfo(map);
-//            System.out.println(map);
-//        }catch (Exception ex){
-//            System.out.println(ex.getMessage());
-//        }
+        saveDataInfo(new String(mqttMessage.getPayload()));
     }
 
     @Override
     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
         logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
     }
+
+    private void saveDataInfo(String info) {
+        ArrayList<IotedgeCollectData> list = new ArrayList<>();
+        JSONObject jsonObject = JSONObject.parseObject(info);
+        Map<String,Object> services = (Map<String, Object>) jsonObject.get("services");
+        for (String key:services.keySet()) {
+            Map<String,Object> tag = (Map<String,Object>) services.get(key);
+            Map<String,Object> tagmap = (Map<String, Object>) tag.get("properties");
+            for (String item:tagmap.keySet()) {
+                Map<String,Object> valuemap = (Map<String, Object>) tagmap.get(item);
+                String value1;
+                if (valuemap.get("value") instanceof Integer){
+                    value1 = String.valueOf(valuemap.get("value"));
+                }else {
+                    value1 =  (String) valuemap.get("value");
+                }
+
+                IotedgeCollectData iotedgeCollectData = new IotedgeCollectData();
+                iotedgeCollectData.setGroupid(jsonObject.get("g_id").toString());
+                iotedgeCollectData.setPid(jsonObject.get("p_id").toString());
+                iotedgeCollectData.setDevice(jsonObject.get("d_id").toString());
+                iotedgeCollectData.setTime(jsonObject.get("ts").toString());
+                iotedgeCollectData.setService(key);
+                iotedgeCollectData.setProperty(item);
+                iotedgeCollectData.setValue(value1);
+
+                list.add(iotedgeCollectData);
+            }
+        }
+        IIotedgeCollectDataService dataService = new IotedgeCollectDataServiceImpl();
+        dataService.saveBatch(list);
+    }
 }
 

+ 14 - 1
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/config/MqttConfig.java

@@ -54,6 +54,11 @@ public class MqttConfig {
      */
     private int keepalive;
 
+    /**
+     * 主题
+     */
+    private String topic;
+
     public void setMqttPushClient(MqttPushClient mqttPushClient) {
         this.mqttPushClient = mqttPushClient;
     }
@@ -114,11 +119,19 @@ public class MqttConfig {
         this.keepalive = keepalive;
     }
 
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
     @Bean
     public MqttPushClient getMqttPushClient() {
         mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
         // 以/#结尾表示订阅所有以test开头的主题
-        mqttPushClient.subscribe("data/1", 1);
+        mqttPushClient.subscribe(topic, 1);
 //        mqttPushClient.subscribe("$SYS/brokers/+/clients/#", 1);
         return mqttPushClient;
     }

+ 1 - 0
jeecg-module-system/jeecg-system-start/src/main/resources/application-dev.yml

@@ -49,6 +49,7 @@ spring:
     completionTimeout: 3000
     timeout: 1000
     keepalive: 20
+    topic: device/#
   ## kafka配置
 #  kafka:
 #    bootstrap-servers: 127.0.0.1:9092