|
@@ -1,11 +1,24 @@
|
|
package org.jeecg.modules.kafka.consumer;
|
|
package org.jeecg.modules.kafka.consumer;
|
|
|
|
|
|
|
|
+import net.sf.json.JSONObject;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
+import org.jeecg.modules.collectdata.dao.CollectdataRepository;
|
|
|
|
+import org.jeecg.modules.collectdata.entity.Collectdata;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.data.mongodb.core.MongoTemplate;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
+import org.springframework.kafka.support.Acknowledgment;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
|
+
|
|
@Component
|
|
@Component
|
|
public class TestConsumer {
|
|
public class TestConsumer {
|
|
|
|
+ @Autowired
|
|
|
|
+ private MongoTemplate mongoTemplate;
|
|
|
|
+ @Autowired
|
|
|
|
+ private CollectdataRepository repository;
|
|
|
|
|
|
/**
|
|
/**
|
|
* 指定一个消费者组,一个主题主题。
|
|
* 指定一个消费者组,一个主题主题。
|
|
@@ -13,8 +26,7 @@ public class TestConsumer {
|
|
*/
|
|
*/
|
|
// @KafkaListener(topicPattern = "thing___.*.___property")
|
|
// @KafkaListener(topicPattern = "thing___.*.___property")
|
|
@KafkaListener(topicPattern = "#{'${spring.kafka.topic-patterns}'}")
|
|
@KafkaListener(topicPattern = "#{'${spring.kafka.topic-patterns}'}")
|
|
- public void simpleConsumer(ConsumerRecord<?, ?> record) {
|
|
|
|
- System.out.println("进入simpleConsumer方法");
|
|
|
|
|
|
+ public void simpleConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
|
|
System.out.printf("topic2.* = %s, 偏移量 = %d, key = %s, 内容 = %s, 创建消息的时间戳 =%d%n \n",
|
|
System.out.printf("topic2.* = %s, 偏移量 = %d, key = %s, 内容 = %s, 创建消息的时间戳 =%d%n \n",
|
|
record.topic(),
|
|
record.topic(),
|
|
record.offset(),
|
|
record.offset(),
|
|
@@ -22,5 +34,59 @@ public class TestConsumer {
|
|
record.value(),
|
|
record.value(),
|
|
record.timestamp()
|
|
record.timestamp()
|
|
);
|
|
);
|
|
|
|
+
|
|
|
|
+ // 提交(用来标记一条消息已经消费完成,即将从消息队列里移除。)
|
|
|
|
+ ack.acknowledge();
|
|
|
|
+ saveDataR(record);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 保存数据(通过mongoTemplate保存)
|
|
|
|
+ * @param record kafka记录
|
|
|
|
+ */
|
|
|
|
+ private void saveData(ConsumerRecord<?, ?> record) {
|
|
|
|
+ JSONObject cd = JSONObject.fromObject(record.value());
|
|
|
|
+
|
|
|
|
+ String equipmentcode = record.topic().replace("thing___", "").replace("___property", "");
|
|
|
|
+ Map<String,Object> map = new HashMap<>();
|
|
|
|
+ map.put("logid", cd.get("id"));
|
|
|
|
+ map.put("logtime", record.timestamp());
|
|
|
|
+ map.put("version", cd.get("version"));
|
|
|
|
+ map.put("equipmentcode", equipmentcode);
|
|
|
|
+ map.put("method", cd.get("method"));
|
|
|
|
+
|
|
|
|
+ JSONObject jsonObject = (JSONObject) JSONObject.fromObject(record.value()).get("properties");
|
|
|
|
+ Map<String,Object> map1 = new HashMap<>();
|
|
|
|
+ for (Object key : jsonObject.keySet()) {
|
|
|
|
+ map1.put(key.toString(), jsonObject.get(key));
|
|
|
|
+ }
|
|
|
|
+ map.put("properties", map1);
|
|
|
|
+
|
|
|
|
+ mongoTemplate.insert(map, "tpmcollectdata");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 保存数据(通过Repository保存)
|
|
|
|
+ * @param record kafka记录
|
|
|
|
+ */
|
|
|
|
+ private void saveDataR(ConsumerRecord<?, ?> record) {
|
|
|
|
+ JSONObject cd = JSONObject.fromObject(record.value());
|
|
|
|
+
|
|
|
|
+ String equipmentcode = record.topic().replace("thing___", "").replace("___property", "");
|
|
|
|
+ Collectdata obj = new Collectdata();
|
|
|
|
+ obj.setLogid(Long.parseLong(cd.get("id").toString()));
|
|
|
|
+ obj.setLogtime(new Date(record.timestamp()));
|
|
|
|
+ obj.setVersion(cd.get("version").toString());
|
|
|
|
+ obj.setEquipmentcode(equipmentcode);
|
|
|
|
+ obj.setMethod(cd.get("method").toString());
|
|
|
|
+
|
|
|
|
+ JSONObject jsonObject = (JSONObject) JSONObject.fromObject(record.value()).get("properties");
|
|
|
|
+ Map<String,Object> map1 = new HashMap<>();
|
|
|
|
+ for (Object key : jsonObject.keySet()) {
|
|
|
|
+ map1.put(key.toString(), jsonObject.get(key));
|
|
|
|
+ }
|
|
|
|
+ obj.setProperties(map1);
|
|
|
|
+
|
|
|
|
+ repository.save(obj);
|
|
}
|
|
}
|
|
}
|
|
}
|