Browse Source

kafka读取数据修改

sl 3 months ago
parent
commit
91b52f32b9

+ 54 - 8
module_ems/src/main/java/org/jeecg/modules/kafka/consumer/TestConsumer.java

@@ -1,7 +1,8 @@
 package org.jeecg.modules.kafka.consumer;
-
+import com.alibaba.fastjson.parser.JSONToken;
 import lombok.extern.slf4j.Slf4j;
 import net.sf.json.JSONObject;
+import net.sf.json.util.JSONTokener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.jeecg.common.util.DateUtils;
 import org.jeecg.modules.collectdata.dao.CollectdataRepository;
@@ -84,25 +85,70 @@ public class TestConsumer {
      * @param record kafka记录
      */
     private void saveData(ConsumerRecord<?, ?> record, String collectionName) {
-        JSONObject cd = JSONObject.fromObject(record.value());
+//        JSONObject cd = JSONObject.fromObject(record.value());
+//        String cleanedValue = ((String) record.value()).replaceAll("[\\r\\n\\t]", "");
+        String cleanedValue = record.value().toString().replaceAll("[\\r\\n\\t]", "");
+        JSONObject cd = JSONObject.fromObject(cleanedValue);
 
         String equipmentcode = record.topic().replace("thing___", "").replace("___property", "");
         Map<String,Object> map = new HashMap<>();
-        map.put("logid", cd.get("id"));
-        map.put("logtime", new Date(record.timestamp()));
-        map.put("version", cd.get("version"));
+        if(cd.containsKey("id")){
+            map.put("logid", cd.get("id"));
+        }else{
+            map.put("logid", cd.get("Id"));
+        }
+//        map.put("logtime", new Date(record.timestamp()));
+        if(cd.containsKey("version")){
+            map.put("version", cd.get("version"));
+        }else{
+            map.put("version", cd.get("Version"));
+        }
         map.put("equipmentcode", equipmentcode);
-        map.put("method", cd.get("method"));
-
-        JSONObject jsonObject = (JSONObject) JSONObject.fromObject(record.value()).get("properties");
+        if(cd.containsKey("method")){
+            map.put("method", cd.get("method"));
+        }else{
+            map.put("method", cd.get("Method"));
+        }
+        JSONObject jsonObject;
+        if(cd.containsKey("properties")){
+            jsonObject = (JSONObject) JSONObject.fromObject(record.value()).get("properties");
+        }else{
+            jsonObject = (JSONObject) JSONObject.fromObject(record.value()).get("Properties");
+        }
+//        JSONObject jsonObject = (JSONObject) JSONObject.fromObject(record.value()).get("Properties");
         Map<String,Object> map1 = new HashMap<>();
         for (Object key : jsonObject.keySet()) {
+            if(key.toString().equals("time") || key.toString().equals("Time")){
+                map.put("logtime", new Date(Long.parseLong(jsonObject.get(key).toString())));
+            }
             map1.put(key.toString(), jsonObject.get(key));
         }
         map.put("properties", map1);
 
         mongoTemplate.insert(map, collectionName);
     }
+//    private void saveData(ConsumerRecord<?, ?> record, String collectionName) {
+////        JSONObject cd = JSONObject.fromObject(record.value());
+//        String cleanedValue = ((String) record.value()).replaceAll("[\\r\\n\\t]", "");
+//        JSONObject cd = JSONObject.fromObject(cleanedValue);
+//
+//        String equipmentcode = record.topic().replace("thing___", "").replace("___property", "");
+//        Map<String,Object> map = new HashMap<>();
+//        map.put("logid", cd.get("id"));
+//        map.put("logtime", new Date(record.timestamp()));
+//        map.put("version", cd.get("version"));
+//        map.put("equipmentcode", equipmentcode);
+//        map.put("method", cd.get("method"));
+//
+//        JSONObject jsonObject = (JSONObject) JSONObject.fromObject(record.value()).get("Properties");
+//        Map<String,Object> map1 = new HashMap<>();
+//        for (Object key : jsonObject.keySet()) {
+//            map1.put(key.toString(), jsonObject.get(key));
+//        }
+//        map.put("properties", map1);
+//
+//        mongoTemplate.insert(map, collectionName);
+//    }
 
     /**
      * 保存数据(通过Repository保存)