소스 검색

集成kafka获取采集数据

丁治程 9 달 전
부모
커밋
1e8aac8f75

+ 5 - 0
jeecg-boot-base-core/pom.xml

@@ -11,6 +11,11 @@
 	<artifactId>jeecg-boot-base-core</artifactId>
 
 	<dependencies>
+		<dependency>
+			<groupId>org.springframework.kafka</groupId>
+			<artifactId>spring-kafka</artifactId>
+			<version>2.8.0</version>
+		</dependency>
 		<!--jeecg-tools-->
 		<dependency>
 			<groupId>org.jeecgframework.boot</groupId>

+ 75 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/kafka/consumer/TestConsumer.java

@@ -0,0 +1,75 @@
+package org.jeecg.modules.kafka.consumer;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
+import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+@Component
+@Slf4j
+public class TestConsumer {
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IIotedgeCollectDataService dataService;
+
+    /**
+     * 指定一个消费者组,一个主题主题。
+     * topicPattern有个问题,如果在本程序运行过程中新增了topic,则监听不到,需要重新启动本程序才可以
+     */
+//    @KafkaListener(topicPattern = "thing___.*.___property")
+    @KafkaListener(topicPattern = "#{'${spring.kafka.topic-patterns}'}")
+    public void simpleConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
+        // 提交(用来标记一条消息已经消费完成,即将从消息队列里移除。)
+        ack.acknowledge();
+        saveDataR(record);
+    }
+
+
+    /**
+     * 保存数据
+     * @param record kafka记录
+     */
+    private void saveDataR(ConsumerRecord<?, ?> record) {
+        System.out.println(record);
+        System.out.println(record.value());
+        ArrayList<IotedgeCollectData> list = new ArrayList<>();
+        String value = (String) record.value();
+        JSONObject jsonObject = JSONObject.parseObject(value);
+        Map<String,Object> services = (Map<String, Object>) jsonObject.get("services");
+        for (String key:services.keySet()) {
+            Map<String,Object> tag = (Map<String,Object>) services.get(key);
+            Map<String,Object> tagmap = (Map<String, Object>) tag.get("properties");
+            for (String item:tagmap.keySet()) {
+                Map<String,Object> valuemap = (Map<String, Object>) tagmap.get(item);
+                String value1;
+                if (valuemap.get("value") instanceof Integer){
+                    value1 = String.valueOf(valuemap.get("value"));
+                }else {
+                    value1 =  (String) valuemap.get("value");
+                }
+
+                IotedgeCollectData iotedgeCollectData = new IotedgeCollectData();
+                iotedgeCollectData.setGroupid(jsonObject.get("g_id").toString());
+                iotedgeCollectData.setPid(jsonObject.get("p_id").toString());
+                iotedgeCollectData.setDevice(jsonObject.get("d_id").toString());
+                iotedgeCollectData.setTime(jsonObject.get("ts").toString());
+                iotedgeCollectData.setService(key);
+                iotedgeCollectData.setProperty(item);
+                iotedgeCollectData.setValue(value1);
+
+                list.add(iotedgeCollectData);
+            }
+        }
+        dataService.saveBatch(list);
+    }
+}

+ 27 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/kafka/controller/TestController.java

@@ -0,0 +1,27 @@
+package org.jeecg.modules.kafka.controller;
+
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+
+import org.jeecg.modules.kafka.producer.TestProducer;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+@Api(tags="kafka")
+@RestController
+@RequestMapping("/kafka/test")
+public class TestController {
+
+    @Resource
+    private TestProducer testProducer;
+
+    @ApiOperation(value="发送消息", notes="发送消息")
+    @GetMapping("/send")
+    public void sendMsg(String topicname, String obj){
+        testProducer.send(topicname, obj);
+    }
+}

+ 44 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/kafka/producer/TestProducer.java

@@ -0,0 +1,44 @@
+package org.jeecg.modules.kafka.producer;
+
+import com.alibaba.fastjson.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+import javax.annotation.Resource;
+
+@Component
+@Slf4j
+public class TestProducer {
+
+    @Resource
+    private KafkaTemplate<String, Object> kafkaTemplate;
+
+    //自定义topic
+//    public static final String TOPIC_TEST = "visible";
+//    public static final String GROUP_TEST = "grout.test";
+//
+    public void send(String topicname, Object obj) {
+        String obj2String = JSON.toJSONString(obj);
+        log.info("准备发送消息为:{}", obj2String);
+        //发送消息
+        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicname, obj);
+        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
+            @Override
+            public void onFailure(@NotNull Throwable throwable) {
+                //发送失败的处理
+                log.info(topicname + " - 生产者 发送消息失败:" + throwable.getMessage());
+            }
+
+            @Override
+            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
+                //成功的处理
+                log.info(topicname + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
+            }
+        });
+    }
+}

+ 6 - 0
jeecg-module-system/jeecg-system-start/pom.xml

@@ -19,6 +19,12 @@
             <version>${jeecgboot.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <version>2.8.0</version>
+        </dependency>
+
         <!-- SYSTEM 系统管理模块 -->
         <dependency>
             <groupId>org.jeecgframework.boot</groupId>

+ 39 - 0
jeecg-module-system/jeecg-system-start/src/main/resources/application-dev.yml

@@ -35,6 +35,45 @@ spring:
           starttls:
             enable: true
             required: true
+  ## kafka配置
+  kafka:
+    bootstrap-servers: 127.0.0.1:9092
+    producer:
+      # 发生错误后,消息重发的次数。
+      retries: 1
+      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
+      batch-size: 16384
+      # 设置生产者内存缓冲区的大小。
+      buffer-memory: 33554432
+      # 键的序列化方式
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # 值的序列化方式
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+      acks: 0
+    consumer:
+      group-id: thing___property
+      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
+      auto-commit-interval: 1S
+      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
+      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
+      auto-offset-reset: latest
+      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+      enable-auto-commit: false
+      # 键的反序列化方式
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      # 值的反序列化方式
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+    listener:
+      # 在侦听器容器中运行的线程数。
+      concurrency: 5
+      #listner负责ack,每调用一次,就立即commit
+      ack-mode: manual_immediate
+      missing-topics-fatal: false
+    topic-patterns: ".*."  # 这里使用通配符来匹配多个主题
   ## quartz定时任务,采用数据库方式
   quartz:
     job-store-type: jdbc