소스 검색

修改mqtt重连问题

sl 2 달 전
부모
커밋
2535e3381a

+ 71 - 37
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/component/MqttPushClient.java

@@ -12,7 +12,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -48,15 +47,16 @@ public class MqttPushClient {
      * @param password  密码
      * @param timeout   超时时间
      * @param keepalive 保留数
+     * @param topic 订阅话题
      */
-    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
-        MqttClient client;
+    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive, String topic) {
+//        MqttClient client;
         try {
             String clientID1 = clientID + "_msg" + UUID.randomUUID();
-            System.out.println("当前客户的mqttId是:" +  clientID1);
+            logger.info("当前客户的mqttId是:" +  clientID1);
             client = new MqttClient(host, clientID1, new MemoryPersistence());
             MqttConnectOptions options = new MqttConnectOptions();
-            options.setAutomaticReconnect(true);
+//            options.setAutomaticReconnect(true);
             options.setCleanSession(true);
             options.setUserName(username);
             options.setPassword(password.toCharArray());
@@ -64,42 +64,76 @@ public class MqttPushClient {
             options.setKeepAliveInterval(keepalive);
             MqttPushClient.setClient(client);
             try {
-                client.setCallback(new MqttCallback() {
-                    @Override
-                    public void connectionLost(Throwable throwable) {
-                        // 连接丢失后,一般在这里面进行重连
-                        logger.info("连接断开,可以做重连");
-                        try {
-                            String clientID2 = clientID + "_msg" + UUID.randomUUID();
-                            System.out.println("当前客户的mqttId是:" +  clientID2);
-                            MqttClient client2 = new MqttClient(host, clientID, new MemoryPersistence());
-                            MqttPushClient.setClient(client2);
-                            client2.connect(options);
-                        } catch (MqttException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-
-                    @Override
-                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
-                        // subscribe后得到的消息会执行到这里面
-                        logger.info("接收消息主题 : " + topic);
-                        logger.info("接收消息Qos : " + mqttMessage.getQos());
-                        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
-                        saveDataInfo(new String(mqttMessage.getPayload()));
-                    }
-
-                    @Override
-                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
-                        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
-                    }
-                });
+//                client.setCallback(new MqttCallbackExtended() {
+//                    @Override
+//                    public void connectComplete(boolean b, String s) {
+////                        logger.info("丢失重连成功");
+//                    }
+//
+//                    @Override
+//                    public void connectionLost(Throwable throwable) {
+//                        // 连接丢失后,一般在这里面进行重连
+//                        logger.info("连接断开,准备进行重连");
+////                        while (true){
+////                            logger.info("正在尝试重连");
+////                            try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
+////                                for(int i=0; i<20; i++){
+////                                    if ( Thread.interrupted() ) {
+////                                        break;
+////                                    }
+////                                }
+////
+////                                Thread.sleep(10000);//10秒
+////                                //当设置 options.setAutomaticReconnect(true);不需要写重连方法,设置false需要写重连方法
+////                                //pushCallback.clientMQTT.start();
+////                                logger.info("MQTT:连接成功");
+////                                break;
+////                            }catch (Exception e){
+////                                e.printStackTrace();
+////                            }
+////                        }
+//                        while(true){
+//                            try {
+//                                logger.info("重连中。。。");
+////                                String clientID1 = clientID + "_msg" + UUID.randomUUID();
+////                                logger.info("当前客户的mqttId是:" +  clientID1);
+////                                client = new MqttClient(host, clientID1, new MemoryPersistence());
+////                                client.subscribe("data/1");
+//                                client.reconnect();
+//                                Thread.sleep(10000);
+//                                if(client.isConnected()){
+//                                    logger.info("mqtt连接成功");
+//                                    break;
+//                                }
+//                            } catch (Exception e){
+//                                logger.error(String.valueOf(e));
+//                            }
+//                        }
+//                    }
+//
+//                    @Override
+//                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+//                        // subscribe后得到的消息会执行到这里面
+//                        logger.info("接收消息主题 : " + topic);
+//                        logger.info("接收消息Qos : " + mqttMessage.getQos());
+//                        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
+//                        saveDataInfo(new String(mqttMessage.getPayload()));
+//                    }
+//
+//                    @Override
+//                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+//                        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
+//                    }
+//                });
+                client.setCallback(new PushCallback(host, clientID, username, password, timeout, keepalive, topic));
                 client.connect(options);
             } catch (Exception e) {
-                e.printStackTrace();
+//                e.printStackTrace();
+                logger.error(String.valueOf(e));
             }
         } catch (Exception e) {
-            e.printStackTrace();
+//            e.printStackTrace();
+            logger.error(String.valueOf(e));
         }
     }
 //    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {

+ 101 - 35
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/component/PushCallback.java

@@ -1,10 +1,8 @@
 package org.jeecg.modules.mqtt.component;
 
 import com.alibaba.fastjson.JSONObject;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
 import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
 import org.jeecg.modules.iotedgeCollectData.service.impl.IotedgeCollectDataServiceImpl;
@@ -12,11 +10,13 @@ import org.jeecg.modules.mqtt.config.MqttConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * ClassName: PushCallback
@@ -27,22 +27,57 @@ import java.util.Map;
  * @Create 2024/8/23 9:22
  * @Version 1.0
  */
-@Component
+//@Component
 public class PushCallback implements MqttCallback {
     private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
 
-    @Autowired
-    private MqttConfig mqttConfig;
+//    @Autowired
+//    private MqttConfig mqttConfig;
 
-    private static MqttClient client;
+    private MqttClient client;
+    private String host;
+    private String clientID;
+    private String username;
+    private String password;
+    private int timeout;
+    private int keepalive;
+    private String topic;
+
+//    @Override
+//    public void connectComplete(boolean b, String s) {
+//        logger.info("丢失重连成功");
+//    }
+
+    public PushCallback(){
+
+    }
+    public PushCallback(String host, String clientID, String username, String password, int timeout, int keepalive, String topic){
+        this.host = host;
+        this.clientID = clientID;
+        this.username = username;
+        this.password = password;
+        this.timeout = timeout;
+        this.keepalive = keepalive;
+        this.topic = topic;
+    }
 
     @Override
     public void connectionLost(Throwable throwable) {
         // 连接丢失后,一般在这里面进行重连
         logger.info("连接断开,可以做重连");
-        if (null != client) {
-            mqttConfig.getMqttPushClient();
+        while (true){
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            reconnectMqttClient();
+            if(this.client.isConnected()){
+                logger.info("重连成功");
+                break;
+            }
         }
+
     }
 
     @Override
@@ -58,36 +93,67 @@ public class PushCallback implements MqttCallback {
         logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
     }
 
+    public void reconnectMqttClient(){
+        String clientID1 = clientID + "_msg" + UUID.randomUUID();
+        logger.info("当前客户的mqttId是:" +  clientID1);
+        try {
+            this.client = new MqttClient(host, clientID1, new MemoryPersistence());
+        } catch (Exception e) {
+//                e.printStackTrace();
+            logger.error(String.valueOf(e));
+        }
+        MqttConnectOptions options = new MqttConnectOptions();
+//            options.setAutomaticReconnect(true);
+        options.setCleanSession(true);
+        options.setUserName(username);
+        options.setPassword(password.toCharArray());
+        options.setConnectionTimeout(timeout);
+        options.setKeepAliveInterval(keepalive);
+        this.client.setCallback(this);
+        try {
+            this.client.connect(options);
+            logger.info("重新订阅主题" + topic);
+            this.client.subscribe(topic);
+        } catch (Exception e) {
+//                e.printStackTrace();
+            logger.error(String.valueOf(e));
+        }
+    }
+
     private void saveDataInfo(String info) {
-        ArrayList<IotedgeCollectData> list = new ArrayList<>();
-        JSONObject jsonObject = JSONObject.parseObject(info);
-        Map<String,Object> services = (Map<String, Object>) jsonObject.get("services");
-        for (String key:services.keySet()) {
-            Map<String,Object> tag = (Map<String,Object>) services.get(key);
-            Map<String,Object> tagmap = (Map<String, Object>) tag.get("properties");
-            for (String item:tagmap.keySet()) {
-                Map<String,Object> valuemap = (Map<String, Object>) tagmap.get(item);
-                String value1;
-                if (valuemap.get("value") instanceof Integer){
-                    value1 = String.valueOf(valuemap.get("value"));
-                }else {
-                    value1 =  (String) valuemap.get("value");
-                }
+        try {
+            ArrayList<IotedgeCollectData> list = new ArrayList<>();
+            JSONObject jsonObject = JSONObject.parseObject(info);
+            Map<String, Object> services = (Map<String, Object>) jsonObject.get("services");
+            for (String key : services.keySet()) {
+                Map<String, Object> tag = (Map<String, Object>) services.get(key);
+                Map<String, Object> tagmap = (Map<String, Object>) tag.get("properties");
+                for (String item : tagmap.keySet()) {
+                    Map<String, Object> valuemap = (Map<String, Object>) tagmap.get(item);
+                    String value1;
+                    if (valuemap.get("value") instanceof Integer) {
+                        value1 = String.valueOf(valuemap.get("value"));
+                    } else {
+                        value1 = (String) valuemap.get("value");
+                    }
 
-                IotedgeCollectData iotedgeCollectData = new IotedgeCollectData();
-                iotedgeCollectData.setGroupid(jsonObject.get("g_id").toString());
-                iotedgeCollectData.setPid(jsonObject.get("p_id").toString());
-                iotedgeCollectData.setDevice(jsonObject.get("d_id").toString());
-                iotedgeCollectData.setTime(jsonObject.get("ts").toString());
-                iotedgeCollectData.setService(key);
-                iotedgeCollectData.setProperty(item);
-                iotedgeCollectData.setValue(value1);
+                    IotedgeCollectData iotedgeCollectData = new IotedgeCollectData();
+                    iotedgeCollectData.setGroupid(jsonObject.get("g_id").toString());
+                    iotedgeCollectData.setPid(jsonObject.get("p_id").toString());
+                    iotedgeCollectData.setDevice(jsonObject.get("d_id").toString());
+                    iotedgeCollectData.setTime(jsonObject.get("ts").toString());
+                    iotedgeCollectData.setService(key);
+                    iotedgeCollectData.setProperty(item);
+                    iotedgeCollectData.setValue(value1);
 
-                list.add(iotedgeCollectData);
+                    list.add(iotedgeCollectData);
+                }
             }
+            IIotedgeCollectDataService dataService = new IotedgeCollectDataServiceImpl();
+            dataService.saveBatch(list);
+        } catch (Exception e){
+            logger.error(String.valueOf(e));
         }
-        IIotedgeCollectDataService dataService = new IotedgeCollectDataServiceImpl();
-        dataService.saveBatch(list);
     }
 }
 

+ 1 - 1
jeecg-module-interlock/src/main/java/org/jeecg/modules/mqtt/config/MqttConfig.java

@@ -129,7 +129,7 @@ public class MqttConfig {
 
     @Bean
     public MqttPushClient getMqttPushClient() {
-        mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
+        mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive, topic);
         // 以/#结尾表示订阅所有以test开头的主题
         mqttPushClient.subscribe(topic, 1);
 //        mqttPushClient.subscribe("$SYS/brokers/+/clients/#", 1);