Browse Source

添加mqtt

sl 2 months ago
parent
commit
ba889eb155

+ 15 - 1
jeecg-boot-base-core/pom.xml

@@ -16,6 +16,20 @@
 			<artifactId>spring-kafka</artifactId>
 			<version>2.8.0</version>
 		</dependency>
+
+		<!--mqtt -->
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-integration</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.integration</groupId>
+			<artifactId>spring-integration-stream</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.springframework.integration</groupId>
+			<artifactId>spring-integration-mqtt</artifactId>
+		</dependency>
 		<!--jeecg-tools-->
 		<dependency>
 			<groupId>org.jeecgframework.boot</groupId>
@@ -216,4 +230,4 @@
 		</dependency>
 	</dependencies>
 
-</project>
+</project>

+ 19 - 0
jeecg-module-interlock/pom.xml

@@ -60,6 +60,25 @@
             <artifactId>jeecg-system-biz</artifactId>
         </dependency>
 
+        <!--mqtt -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-stream</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <optional>true</optional>
+        </dependency>
+
     </dependencies>
 
 </project>

+ 76 - 76
jeecg-module-interlock/src/main/java/org/jeecg/modules/kafka/consumer/TestConsumer.java

@@ -1,76 +1,76 @@
-package org.jeecg.modules.kafka.consumer;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
-import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.Acknowledgment;
-import org.springframework.stereotype.Component;
-
-import java.util.*;
-
-@Component
-@Slf4j
-public class TestConsumer {
-
-    @Autowired
-    @SuppressWarnings("all")
-    private IIotedgeCollectDataService dataService;
-
-    /**
-     * 指定一个消费者组,一个主题主题。
-     * topicPattern有个问题,如果在本程序运行过程中新增了topic,则监听不到,需要重新启动本程序才可以
-     */
-//    @KafkaListener(topicPattern = "thing___.*.___property")
-    @KafkaListener(topicPattern = "#{'${spring.kafka.topic-patterns}'}")
-    public void simpleConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
-        // 提交(用来标记一条消息已经消费完成,即将从消息队列里移除。)
-        ack.acknowledge();
-        saveDataR(record);
-    }
-
-
-    /**
-     * 保存数据
-     * @param record kafka记录
-     */
-    private void saveDataR(ConsumerRecord<?, ?> record) {
-        log.info("保存数据:"+record.value());
-        //System.out.println(record);
-        //System.out.println(record.value());
-        ArrayList<IotedgeCollectData> list = new ArrayList<>();
-        String value = (String) record.value();
-        JSONObject jsonObject = JSONObject.parseObject(value);
-        Map<String,Object> services = (Map<String, Object>) jsonObject.get("services");
-        for (String key:services.keySet()) {
-            Map<String,Object> tag = (Map<String,Object>) services.get(key);
-            Map<String,Object> tagmap = (Map<String, Object>) tag.get("properties");
-            for (String item:tagmap.keySet()) {
-                Map<String,Object> valuemap = (Map<String, Object>) tagmap.get(item);
-                String value1;
-                if (valuemap.get("value") instanceof Integer){
-                    value1 = String.valueOf(valuemap.get("value"));
-                }else {
-                    value1 =  (String) valuemap.get("value");
-                }
-
-                IotedgeCollectData iotedgeCollectData = new IotedgeCollectData();
-                iotedgeCollectData.setGroupid(jsonObject.get("g_id").toString());
-                iotedgeCollectData.setPid(jsonObject.get("p_id").toString());
-                iotedgeCollectData.setDevice(jsonObject.get("d_id").toString());
-                iotedgeCollectData.setTime(jsonObject.get("ts").toString());
-                iotedgeCollectData.setService(key);
-                iotedgeCollectData.setProperty(item);
-                iotedgeCollectData.setValue(value1);
-
-                list.add(iotedgeCollectData);
-            }
-        }
-        dataService.saveBatch(list);
-    }
-}
+//package org.jeecg.modules.kafka.consumer;
+//
+//import com.alibaba.fastjson.JSON;
+//import com.alibaba.fastjson.JSONObject;
+//import com.alibaba.fastjson.TypeReference;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.kafka.clients.consumer.ConsumerRecord;
+//import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
+//import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.kafka.annotation.KafkaListener;
+//import org.springframework.kafka.support.Acknowledgment;
+//import org.springframework.stereotype.Component;
+//
+//import java.util.*;
+//
+//@Component
+//@Slf4j
+//public class TestConsumer {
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IIotedgeCollectDataService dataService;
+//
+//    /**
+//     * 指定一个消费者组,一个主题主题。
+//     * topicPattern有个问题,如果在本程序运行过程中新增了topic,则监听不到,需要重新启动本程序才可以
+//     */
+////    @KafkaListener(topicPattern = "thing___.*.___property")
+//    @KafkaListener(topicPattern = "#{'${spring.kafka.topic-patterns}'}")
+//    public void simpleConsumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
+//        // 提交(用来标记一条消息已经消费完成,即将从消息队列里移除。)
+//        ack.acknowledge();
+//        saveDataR(record);
+//    }
+//
+//
+//    /**
+//     * 保存数据
+//     * @param record kafka记录
+//     */
+//    private void saveDataR(ConsumerRecord<?, ?> record) {
+//        log.info("保存数据:"+record.value());
+//        //System.out.println(record);
+//        //System.out.println(record.value());
+//        ArrayList<IotedgeCollectData> list = new ArrayList<>();
+//        String value = (String) record.value();
+//        JSONObject jsonObject = JSONObject.parseObject(value);
+//        Map<String,Object> services = (Map<String, Object>) jsonObject.get("services");
+//        for (String key:services.keySet()) {
+//            Map<String,Object> tag = (Map<String,Object>) services.get(key);
+//            Map<String,Object> tagmap = (Map<String, Object>) tag.get("properties");
+//            for (String item:tagmap.keySet()) {
+//                Map<String,Object> valuemap = (Map<String, Object>) tagmap.get(item);
+//                String value1;
+//                if (valuemap.get("value") instanceof Integer){
+//                    value1 = String.valueOf(valuemap.get("value"));
+//                }else {
+//                    value1 =  (String) valuemap.get("value");
+//                }
+//
+//                IotedgeCollectData iotedgeCollectData = new IotedgeCollectData();
+//                iotedgeCollectData.setGroupid(jsonObject.get("g_id").toString());
+//                iotedgeCollectData.setPid(jsonObject.get("p_id").toString());
+//                iotedgeCollectData.setDevice(jsonObject.get("d_id").toString());
+//                iotedgeCollectData.setTime(jsonObject.get("ts").toString());
+//                iotedgeCollectData.setService(key);
+//                iotedgeCollectData.setProperty(item);
+//                iotedgeCollectData.setValue(value1);
+//
+//                list.add(iotedgeCollectData);
+//            }
+//        }
+//        dataService.saveBatch(list);
+//    }
+//}

+ 27 - 27
jeecg-module-interlock/src/main/java/org/jeecg/modules/kafka/controller/TestController.java

@@ -1,27 +1,27 @@
-package org.jeecg.modules.kafka.controller;
-
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-
-import org.jeecg.modules.kafka.producer.TestProducer;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import javax.annotation.Resource;
-
-@Api(tags="kafka")
-@RestController
-@RequestMapping("/kafka/test")
-public class TestController {
-
-    @Resource
-    private TestProducer testProducer;
-
-    @ApiOperation(value="发送消息", notes="发送消息")
-    @GetMapping("/send")
-    public void sendMsg(String topicname, String obj){
-        testProducer.send(topicname, obj);
-    }
-}
+//package org.jeecg.modules.kafka.controller;
+//
+//
+//import io.swagger.annotations.Api;
+//import io.swagger.annotations.ApiOperation;
+//
+//import org.jeecg.modules.kafka.producer.TestProducer;
+//import org.springframework.web.bind.annotation.GetMapping;
+//import org.springframework.web.bind.annotation.RequestMapping;
+//import org.springframework.web.bind.annotation.RestController;
+//
+//import javax.annotation.Resource;
+//
+//@Api(tags="kafka")
+//@RestController
+//@RequestMapping("/kafka/test")
+//public class TestController {
+//
+//    @Resource
+//    private TestProducer testProducer;
+//
+//    @ApiOperation(value="发送消息", notes="发送消息")
+//    @GetMapping("/send")
+//    public void sendMsg(String topicname, String obj){
+//        testProducer.send(topicname, obj);
+//    }
+//}

+ 43 - 43
jeecg-module-interlock/src/main/java/org/jeecg/modules/kafka/producer/TestProducer.java

@@ -1,44 +1,44 @@
-package org.jeecg.modules.kafka.producer;
-
-import com.alibaba.fastjson.JSON;
-import lombok.extern.slf4j.Slf4j;
-import org.jetbrains.annotations.NotNull;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.stereotype.Component;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-
-import javax.annotation.Resource;
-
-@Component
-@Slf4j
-public class TestProducer {
-
-    @Resource
-    private KafkaTemplate<String, Object> kafkaTemplate;
-
-    //自定义topic
-//    public static final String TOPIC_TEST = "visible";
-//    public static final String GROUP_TEST = "grout.test";
+//package org.jeecg.modules.kafka.producer;
 //
-    public void send(String topicname, Object obj) {
-        String obj2String = JSON.toJSONString(obj);
-        log.info("准备发送消息为:{}", obj2String);
-        //发送消息
-        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicname, obj);
-        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
-            @Override
-            public void onFailure(@NotNull Throwable throwable) {
-                //发送失败的处理
-                log.info(topicname + " - 生产者 发送消息失败:" + throwable.getMessage());
-            }
-
-            @Override
-            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
-                //成功的处理
-                log.info(topicname + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
-            }
-        });
-    }
-}
+//import com.alibaba.fastjson.JSON;
+//import lombok.extern.slf4j.Slf4j;
+//import org.jetbrains.annotations.NotNull;
+//import org.springframework.kafka.core.KafkaTemplate;
+//import org.springframework.kafka.support.SendResult;
+//import org.springframework.stereotype.Component;
+//import org.springframework.util.concurrent.ListenableFuture;
+//import org.springframework.util.concurrent.ListenableFutureCallback;
+//
+//import javax.annotation.Resource;
+//
+//@Component
+//@Slf4j
+//public class TestProducer {
+//
+//    @Resource
+//    private KafkaTemplate<String, Object> kafkaTemplate;
+//
+//    //自定义topic
+////    public static final String TOPIC_TEST = "visible";
+////    public static final String GROUP_TEST = "grout.test";
+////
+//    public void send(String topicname, Object obj) {
+//        String obj2String = JSON.toJSONString(obj);
+//        log.info("准备发送消息为:{}", obj2String);
+//        //发送消息
+//        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicname, obj);
+//        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
+//            @Override
+//            public void onFailure(@NotNull Throwable throwable) {
+//                //发送失败的处理
+//                log.info(topicname + " - 生产者 发送消息失败:" + throwable.getMessage());
+//            }
+//
+//            @Override
+//            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
+//                //成功的处理
+//                log.info(topicname + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
+//            }
+//        });
+//    }
+//}

+ 166 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/component/MqttPushClient.java

@@ -0,0 +1,166 @@
+package org.jeecg.modules.mqtt.component;
+
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ClassName: MqttPushClient
+ * Package: org.jeecg.modules.mqtt.component
+ * Description:  mqtt推送客户端
+ *
+ * @Author sl
+ * @Create 2024/8/23 9:23
+ * @Version 1.0
+ */
+@Component
+public class MqttPushClient {
+    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
+//    @Autowired
+//    private PushCallback pushCallback;
+    private static MqttClient client;
+    private final boolean isNeedReconnect = false;
+    private static MqttClient getClient() {
+        return client;
+    }
+    private static void setClient(MqttClient client) {
+        MqttPushClient.client = client;
+    }
+
+    /**
+     * 客户端连接
+     *
+     * @param host      ip+端口
+     * @param clientID  客户端Id
+     * @param username  用户名
+     * @param password  密码
+     * @param timeout   超时时间
+     * @param keepalive 保留数
+     */
+    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
+        MqttClient client;
+        try {
+            client = new MqttClient(host, clientID, new MemoryPersistence());
+            MqttConnectOptions options = new MqttConnectOptions();
+            options.setAutomaticReconnect(true);
+            options.setCleanSession(true);
+            options.setUserName(username);
+            options.setPassword(password.toCharArray());
+            options.setConnectionTimeout(timeout);
+            options.setKeepAliveInterval(keepalive);
+            MqttPushClient.setClient(client);
+            try {
+                client.setCallback(new MqttCallback() {
+                    @Override
+                    public void connectionLost(Throwable throwable) {
+                        // 连接丢失后,一般在这里面进行重连
+                        logger.info("连接断开,可以做重连");
+//                        try {
+//                            client.connect(options);
+//                        } catch (MqttException e) {
+//                            throw new RuntimeException(e);
+//                        }
+                    }
+
+                    @Override
+                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+                        // subscribe后得到的消息会执行到这里面
+                        logger.info("接收消息主题 : " + topic);
+                        logger.info("接收消息Qos : " + mqttMessage.getQos());
+                        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
+
+                //            JSONObject jsonObject = deviceDataService.analysisMessage(new String(mqttMessage.getPayload()));
+                        Map<String, Object> map = new HashMap<>();
+                //            map.put("map", jsonObject);
+                //            com.example.emqtt.service.WebsocketServer.sendInfo(map);
+                //            System.out.println(map);
+                //        }catch (Exception ex){
+                //            System.out.println(ex.getMessage());
+                //        }
+                    }
+
+                    @Override
+                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+                        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
+                    }
+                });
+                client.connect(options);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+//    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
+//        MqttClient client;
+//        try {
+//            client = new MqttClient(host, clientID, new MemoryPersistence());
+//            MqttConnectOptions options = new MqttConnectOptions();
+//            options.setCleanSession(true);
+//            options.setUserName(username);
+//            options.setPassword(password.toCharArray());
+//            options.setConnectionTimeout(timeout);
+//            options.setKeepAliveInterval(keepalive);
+//            MqttPushClient.setClient(client);
+//            try {
+//                client.setCallback(pushCallback);
+//                client.connect(options);
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//    }
+
+    /**
+     * 发布
+     *
+     * @param qos         连接方式
+     * @param retained    是否保留
+     * @param topic       主题
+     * @param pushMessage 消息体
+     */
+    public void publish(int qos, boolean retained, String topic, String pushMessage) {
+        MqttMessage message = new MqttMessage();
+        message.setQos(qos);
+        message.setRetained(retained);
+        message.setPayload(pushMessage.getBytes());
+        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
+        if (null == mTopic) {
+            logger.error("topic not exist");
+        }
+        MqttDeliveryToken token;
+        try {
+            token = mTopic.publish(message);
+            token.waitForCompletion();
+        } catch (MqttPersistenceException e) {
+            e.printStackTrace();
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 订阅某个主题
+     *
+     * @param topic 主题
+     * @param qos   连接方式
+     */
+    public void subscribe(String topic, int qos) {
+        logger.info("开始订阅主题" + topic);
+        try {
+            MqttPushClient.getClient().subscribe(topic, qos);
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+}
+

+ 65 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/component/PushCallback.java

@@ -0,0 +1,65 @@
+package org.jeecg.modules.mqtt.component;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.jeecg.modules.mqtt.config.MqttConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ClassName: PushCallback
+ * Package: org.jeecg.modules.mqtt.component
+ * Description:
+ *
+ * @Author sl
+ * @Create 2024/8/23 9:22
+ * @Version 1.0
+ */
+@Component
+public class PushCallback implements MqttCallback {
+    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
+
+    @Autowired
+    private MqttConfig mqttConfig;
+
+    private static MqttClient client;
+
+    @Override
+    public void connectionLost(Throwable throwable) {
+        // 连接丢失后,一般在这里面进行重连
+        logger.info("连接断开,可以做重连");
+        if (null != client) {
+            mqttConfig.getMqttPushClient();
+        }
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+//        try {
+        // subscribe后得到的消息会执行到这里面
+        logger.info("接收消息主题 : " + topic);
+        logger.info("接收消息Qos : " + mqttMessage.getQos());
+        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
+//            JSONObject jsonObject = deviceDataService.analysisMessage(new String(mqttMessage.getPayload()));
+        Map<String, Object> map = new HashMap<>();
+//            map.put("map", jsonObject);
+//            com.example.emqtt.service.WebsocketServer.sendInfo(map);
+//            System.out.println(map);
+//        }catch (Exception ex){
+//            System.out.println(ex.getMessage());
+//        }
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
+    }
+}
+

+ 126 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/config/MqttConfig.java

@@ -0,0 +1,126 @@
+package org.jeecg.modules.mqtt.config;
+
+import lombok.Data;
+import org.jeecg.modules.mqtt.component.MqttPushClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+/**
+ * ClassName: MqttConfig
+ * Package: org.jeecg.modules.mqtt.config
+ * Description:    mqtt相关配置信息
+ *
+ * @Author sl
+ * @Create 2024/8/23 9:55
+ * @Version 1.0
+ */
+@Component
+@ConfigurationProperties("spring.mqtt")
+public class MqttConfig {
+    private static final Logger log = LoggerFactory.getLogger(MqttConfig.class);
+    @Autowired
+    private MqttPushClient mqttPushClient;
+
+    /**
+     * 用户名
+     */
+    private String username;
+    /**
+     * 密码
+     */
+    private String password;
+    /**
+     * 连接地址
+     */
+    private String hostUrl;
+    /**
+     * 客户Id
+     */
+    private String clientId;
+    /**
+     * 默认连接话题
+     */
+    private String defaultTopic;
+    /**
+     * 超时时间
+     */
+    private int timeout;
+    /**
+     * 保持连接数
+     */
+    private int keepalive;
+
+    public void setMqttPushClient(MqttPushClient mqttPushClient) {
+        this.mqttPushClient = mqttPushClient;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getHostUrl() {
+        return hostUrl;
+    }
+
+    public void setHostUrl(String hostUrl) {
+        this.hostUrl = hostUrl;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getDefaultTopic() {
+        return defaultTopic;
+    }
+
+    public void setDefaultTopic(String defaultTopic) {
+        this.defaultTopic = defaultTopic;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getKeepalive() {
+        return keepalive;
+    }
+
+    public void setKeepalive(int keepalive) {
+        this.keepalive = keepalive;
+    }
+
+    @Bean
+    public MqttPushClient getMqttPushClient() {
+        mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
+        // 以/#结尾表示订阅所有以test开头的主题
+        mqttPushClient.subscribe("data/1", 1);
+//        mqttPushClient.subscribe("$SYS/brokers/+/clients/#", 1);
+        return mqttPushClient;
+    }
+
+}

+ 96 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/controller/MqttTestController.java

@@ -0,0 +1,96 @@
+package org.jeecg.modules.mqtt.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.jeecg.common.api.vo.Result;
+import org.jeecg.modules.mqtt.component.MqttPushClient;
+import org.jeecg.modules.mqtt.config.MqttConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+
+/**
+ * ClassName: MqttTestController
+ * Package: org.jeecg.modules.mqtt.controller
+ * Description:    测试mqtt
+ *
+ * @Author sl
+ * @Create 2024/8/23 9:42
+ * @Version 1.0
+ */
+@Api(tags="mqtt")
+@RestController
+@RequestMapping("/mqtt")
+public class MqttTestController {
+    /**
+     * 客户端
+     */
+    @Autowired
+    private MqttPushClient mqttPushClient;
+
+    @Autowired
+    private MqttConfig mqttConfig;
+
+//    /**
+//     * 创建主题
+//     * @param topicName
+//     * @return
+//     */
+//    @PostMapping("/createTopic")
+//    public Result createTopic(String user, String topicName){
+//        //直接将主题放在缓存中,用的时候从缓存中取出来
+//        redisUtil.set(user,topicName);
+//        return ResponseResult.success("创建成功,主题为:"+topicName);
+//    }
+//    /**
+//     * 根据用户获取主题
+//     * @param user
+//     * @return
+//     */
+//    @PostMapping("/getTopic")
+//    public ResponseResult getTopic(String user){
+//        String topicName = redisUtil.get(user).toString();
+//        return ResponseResult.success(topicName);
+//    }
+//    /**
+//     * 订阅主题
+//     */
+//    @PostMapping("/subscribeTopic")
+//    public ResponseResult subscribeTopic(String user){
+//        String topicName = redisUtil.get(user).toString();
+//        myMQTTClient.subscribe(topicName,1);
+//        return ResponseResult.success("订阅"+topicName+"主题成功");
+//    }
+//    /**
+//     * 取消订阅主题
+//     */
+//    @PostMapping("/cleanSubscribeTopic")
+//    public ResponseResult cleanSubscribeTopic(String user){
+//        String topicName = redisUtil.get(user).toString();
+//        myMQTTClient.cleanTopic(topicName);
+//        return ResponseResult.success("取消订阅"+topicName+"主题成功");
+//    }
+    /**
+     * 发送消息
+     */
+    @ApiOperation(value="mqtt发布消息", notes="mqtt发布消息")
+    @PostMapping("/sendMsg")
+    public Result<Object> sendMsg(){
+        int num = 0;
+        while(true){
+            num+=1;
+            if(num>1000){
+                System.out.println("发送了一千条消息,发送完毕");
+                return Result.OK("发送成功");
+            }
+            //发送消息
+            mqttPushClient.publish(1, false, "data/1", "当前消息。。。");
+        }
+
+
+    }
+
+
+
+}

+ 14 - 0
jeecg-module-system/jeecg-system-start/pom.xml

@@ -25,6 +25,20 @@
             <version>2.8.0</version>
         </dependency>
 
+        <!--mqtt -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-stream</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
+
         <!-- SYSTEM 系统管理模块 -->
         <dependency>
             <groupId>org.jeecgframework.boot</groupId>

+ 48 - 38
jeecg-module-system/jeecg-system-start/src/main/resources/application-dev.yml

@@ -39,45 +39,55 @@ spring:
           starttls:
             enable: true
             required: true
+  ##mqtt配置
+  mqtt:
+    username: admin
+    password: 302201
+    #    测试服务器mqtt
+    host-url: tcp://119.3.168.55:1883
+    clientId: mqttId
+    completionTimeout: 3000
+    timeout: 1000
+    keepalive: 20
   ## kafka配置
-  kafka:
-    bootstrap-servers: 127.0.0.1:9092
-    producer:
-      # 发生错误后,消息重发的次数。
-      retries: 1
-      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
-      batch-size: 16384
-      # 设置生产者内存缓冲区的大小。
-      buffer-memory: 33554432
-      # 键的序列化方式
-      key-serializer: org.apache.kafka.common.serialization.StringSerializer
-      # 值的序列化方式
-      value-serializer: org.apache.kafka.common.serialization.StringSerializer
-      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
-      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
-      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
-      acks: 0
-    consumer:
-      group-id: thing___property
-      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
-      auto-commit-interval: 1S
-      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
-      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
-      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
-      auto-offset-reset: latest
-      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
-      enable-auto-commit: false
-      # 键的反序列化方式
-      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      # 值的反序列化方式
-      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-    listener:
-      # 在侦听器容器中运行的线程数。
-      concurrency: 5
-      #listner负责ack,每调用一次,就立即commit
-      ack-mode: manual_immediate
-      missing-topics-fatal: false
-    topic-patterns: ".*."  # 这里使用通配符来匹配多个主题
+#  kafka:
+#    bootstrap-servers: 127.0.0.1:9092
+#    producer:
+#      # 发生错误后,消息重发的次数。
+#      retries: 1
+#      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
+#      batch-size: 16384
+#      # 设置生产者内存缓冲区的大小。
+#      buffer-memory: 33554432
+#      # 键的序列化方式
+#      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+#      # 值的序列化方式
+#      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+#      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+#      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+#      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+#      acks: 0
+#    consumer:
+#      group-id: thing___property
+#      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
+#      auto-commit-interval: 1S
+#      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+#      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
+#      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
+#      auto-offset-reset: latest
+#      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+#      enable-auto-commit: false
+#      # 键的反序列化方式
+#      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+#      # 值的反序列化方式
+#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+#    listener:
+#      # 在侦听器容器中运行的线程数。
+#      concurrency: 5
+#      #listner负责ack,每调用一次,就立即commit
+#      ack-mode: manual_immediate
+#      missing-topics-fatal: false
+#    topic-patterns: ".*."  # 这里使用通配符来匹配多个主题
   ## quartz定时任务,采用数据库方式
   quartz:
     job-store-type: jdbc