Переглянути джерело

通过PostgreSQL的通知机制(LISTEN/NOTIFY)实时判断各种状态,并且记录历史数据

丁治程 9 місяців тому
батько
коміт
b1bd775982

+ 7 - 0
jeecg-module-interlock/pom.xml

@@ -21,6 +21,13 @@
 
     <dependencies>
 
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.5.0</version>
+        </dependency>
+
         <dependency>
             <groupId>org.jeecgframework.boot</groupId>
             <artifactId>jeecg-boot-base-core</artifactId>

+ 396 - 363
jeecg-module-interlock/src/main/java/org/jeecg/modules/binlog/MysqlBinLogClient.java

@@ -1,363 +1,396 @@
-package org.jeecg.modules.binlog;
-
-import cn.hutool.core.util.ObjectUtil;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.event.EventData;
-import com.github.shyiko.mysql.binlog.event.TableMapEventData;
-import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
-import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.ObjectUtils;
-import org.jeecg.common.util.DateUtils;
-import org.jeecg.modules.detail.entity.InterlockDetail;
-import org.jeecg.modules.detail.service.IInterlockDetailService;
-import org.jeecg.modules.history.convert.InterlockDetailHistoryConvert;
-import org.jeecg.modules.history.convert.InterlockSummaryHistoryConvert;
-import org.jeecg.modules.history.entity.InterlockDetailHistory;
-import org.jeecg.modules.history.entity.InterlockSummaryHistory;
-import org.jeecg.modules.history.service.IInterlockDetailHistoryService;
-import org.jeecg.modules.history.service.IInterlockSummaryHistoryService;
-import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
-import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
-import org.jeecg.modules.summary.entity.InterlockSummary;
-import org.jeecg.modules.summary.service.IInterlockSummaryService;
-import org.jeecg.modules.tag.entity.InterlockTag;
-import org.jeecg.modules.tag.service.IInterlockTagService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.core.annotation.Order;
-import org.springframework.data.redis.core.BoundHashOperations;
-import org.springframework.data.redis.core.HashOperations;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.text.DecimalFormat;
-import java.time.Duration;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author dzc
- * @date 2024/5/30 9:57
- * @package org.jeecg.modules.binlog
- * @project interlock_server
- * @des  binlog监控 需要在 my.ini文件中进行配置
- */
-
-//此类可以监控MySQL库数据的增删改
-@Order
-@Component
-@Slf4j  //用于打印日志
-//在SpringBoot中,提供了一个接口:ApplicationRunner。
-//该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。
-public class MysqlBinLogClient implements ApplicationRunner {
-
-
-
-    @Value("${binarylog.host}")
-    private String host;
-    @Value("${binarylog.port}")
-    private Integer port;
-    @Value("${binarylog.schema}")
-    private String schema;
-    @Value("${binarylog.username}")
-    private String username;
-    @Value("${binarylog.password}")
-    private String password;
-
-    private Long interlockdetailid = 1L; // 联锁详细信息
-
-    @Autowired
-    @SuppressWarnings("all")
-    private IInterlockDetailService detailService;
-
-    @Autowired
-    @SuppressWarnings("all")
-    private IInterlockSummaryService summaryService;
-
-    @Autowired
-    @SuppressWarnings("all")
-    private IIotedgeCollectDataService iotedgeCollectDataService;
-
-    @Autowired
-    @SuppressWarnings("all")
-    private IInterlockTagService tagService;
-
-    @Autowired
-    @SuppressWarnings("all")
-    private IInterlockSummaryHistoryService summaryHistoryService;
-
-    @Autowired
-    @SuppressWarnings("all")
-    private IInterlockDetailHistoryService detailHistoryService;
-
-    @Async
-    @Override
-    @Order
-    public void run(ApplicationArguments args) throws Exception {
-        //项目启动完成连接bin-log
-        new Thread(() -> {
-            connectMysqlBinLog();
-        }).start();
-    }
-
-    /**
-     * 连接mysqlBinLog
-     */
-    public void connectMysqlBinLog() {
-        log.info("监控BinLog服务已启动");
-
-        //自己MySQL的信息。host,port,username,password
-        BinaryLogClient client = new BinaryLogClient(host, port, schema, username, password);
-        // 设置连接时间为20秒
-        client.setConnectTimeout(20 * 1000);
-
-        /**因为binlog不是以数据库为单位划分的,所以监控binglog不是监控的单个的数据库,而是整个     当前所设置连接的MySQL,
-         *其中任何一个库发生数据增删改,这里都能检测到,
-         *所以不用设置所监控的数据库的名字(我也不知道怎么设置,没发现有包含这个形参的构造函数)
-         *如果需要只监控指定的数据库,可以看后面代码,可以获取到当前发生变更的数据库名称。可以根据名称来决定是否监控
-         **/
-
-        client.setServerId(10); //和自己之前设置的server-id保持一致,但是我不知道为什么不一致也能成功
-        //下面直接照抄就行
-        client.registerEventListener(event -> {
-            EventData data = event.getData();
-            if (data instanceof TableMapEventData) {
-                //只要连接的MySQL发生的增删改的操作,则都会进入这里,无论哪个数据库
-                TableMapEventData tableMapEventData = (TableMapEventData) data;
-                if (schema.equals(tableMapEventData.getDatabase())) {
-                    if ("interlock_detail".equals(tableMapEventData.getTable())) {
-                        interlockdetailid = tableMapEventData.getTableId();
-                    }
-                }
-            }
-
-            if (data instanceof UpdateRowsEventData) {
-                long tableId = ((UpdateRowsEventData) data).getTableId();
-                if (interlockdetailid == tableId){
-                    System.out.println("表中数据发生了更改"+data.toString());
-                    Object[] array = Arrays.stream(((UpdateRowsEventData) data).getRows().get(0).getValue()).toArray();
-
-                    String ybStatusTag = (String) array[13];     // 仪表状态点位 值
-                    String ybValueTag = (String) array[17];      // 仪表原始模拟量点位 值
-                    String plTag = (String) array[30];           // 旁路点位 值
-                    String inputStatusTag = (String) array[34];  // 输入卡件状态点位 值
-                    String outputStatusTag = (String) array[38]; // 输出卡件状态点位 值
-                    String mpStatusTag = (String) array[42];     // MP状态点位 值
-                    String summaryId = (String) array[1];  // 联锁总表ID
-                    String id = (String) array[0];  // 联锁条件ID
-                    String ybIfFs = (String) array[9];   // 仪表状态判断方式
-                    String upperLimit = ""; // 上限
-                    if (ObjectUtil.isNotNull(array[18])) {
-                        upperLimit = (String)array[18];
-                    }
-                    String lowerLimit = ""; // 下限
-                    if (ObjectUtil.isNotNull(array[19])){
-                        lowerLimit = (String)array[19];
-                    }
-                    String ifBypass = (String) array[29];  // 是否旁路  (若 "是" 则 bypass 等于 点位值;若 "否" 则 bypass 等于 输入值)
-
-                    // 仪表状态     (明细表)
-                    String ybStatus = "0"; // 0:正常 1:故障
-
-                    // 直接读取位号
-                    if ("0".equals(ybIfFs) && ObjectUtil.isNotNull(ybStatusTag)){
-                        // 仪表状态 等于点位值
-                        ybStatus = ybStatusTag;
-                    }
-
-                    // 高低限判断
-                    if ("1".equals(ybIfFs)){
-                        // 如果 仪表值 大于上限 或者 小于下限 则状态为 异常;两种情况不会同时满足
-                        if (Integer.parseInt(ybValueTag) >  Integer.parseInt(upperLimit) || Integer.parseInt(ybValueTag) < Integer.parseInt(lowerLimit)){
-                            ybStatus = "1";
-                        }
-                    }
-
-                    // 突变超限判断  规定时间内 (结束值-初始值)/初始值 * 100 百分比 是否大于 规定的阈值
-                    if ("2".equals(ybIfFs)){
-                        // 如果 采集的频率 大于 规定的时间
-                        // 例如 采集是一分钟采集一次 规定的时间的2s之内
-                        // 如果 采集的频率 小于等于 规定的时间
-                        // 例如 2S之内  去采集数据表中根据 设备id+模块名称+点位名称 查询2s之前的数据,但是表中的数据量大 时间?
-                        String yz = (String) array[20];  // 阈值
-                        String time = (String) array[21];  // 规定的时间
-                        String dw = (String) array[22];   // 时间单位
-                        if ("s".equals(dw)){
-                            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-                            String endDate = DateUtils.getDate("yyyy-MM-dd HH:mm:ss");  // 当前时间 规定时间的结束时间
-                            LocalDateTime endDateTime = LocalDateTime.parse(endDate, formatter);
-                            LocalDateTime beginDateTime = endDateTime.minus(Duration.ofSeconds(2));
-                            String beginDate = beginDateTime.format(formatter);  // 开始时间 规定时间的开始时间
-                            QueryWrapper<InterlockTag> tagQuery = new QueryWrapper<>();
-                            tagQuery.eq("interlock_condition_id",id).eq("parameter_type","8");
-                            InterlockTag interlockTag = tagService.getOne(tagQuery);
-                            IotedgeCollectData iotedgeData = iotedgeCollectDataService.getOneInfo(interlockTag.getDeviceId(),interlockTag.getModuleName(),interlockTag.getTagName(),beginDate);
-                            BigDecimal beginValue = BigDecimal.valueOf(Integer.parseInt(iotedgeData.getValue()));
-                            BigDecimal num = BigDecimal.valueOf(Integer.parseInt(ybValueTag)).subtract(beginValue);
-                            if ((num.divide(beginValue).compareTo(BigDecimal.valueOf(Integer.parseInt(yz)))) > 0){
-                                ybStatus = "1";
-                            }
-                        }
-                    }
-
-                    // todo 判断 仪表状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
-                    String ifYbStatus = "1";  // 判断仪表状态较之前是否发生了变化 0:未发生变化 1:变化
-                    String currYbStatus = (String) array[8];
-                    if (ObjectUtil.isNotNull(currYbStatus) && ybStatus.equals(currYbStatus)){
-                        ifYbStatus = "0";
-                    }
-
-
-                    // 控制系统状态  (明细表)
-                    String kzxtStatus = "0"; // 0:正常 1:非正常
-                    if (Integer.parseInt(inputStatusTag) == 1 || Integer.parseInt(outputStatusTag) == 1 || Integer.parseInt(mpStatusTag) == 1){
-                        kzxtStatus = "1";
-                    }
-
-                    // todo 判断 控制系统状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
-                    String ifKzxtStatus = "1"; // 判断控制系统状态较之前是否发生了变化 0:未发生变化 1:变化
-                    String currKzxtStatus = (String) array[23];
-                    if (ObjectUtil.isNotNull(currKzxtStatus) && kzxtStatus.equals(currKzxtStatus)){
-                        ifKzxtStatus = "0";
-                    }
-
-
-                    // 根据总表id 查询数所有的联锁条件
-                    String zdybzt = "0";   // 该联锁下 所有联锁条件的 (总)仪表状态
-                    String zdkzxtzt = "0"; // 该联锁下 所有联塑条件的 (总)控制系统状态
-                    int ybcount = 0;    // 该联锁下 所有联锁条件 仪表状态为 异常(1) 的数量
-                    int kzxtcount = 0;  // 该联锁下 所有联锁条件 控制系统状态为 异常(1) 的数量
-                    int plcount = 0;    // 该联锁下 所有联锁条件 旁路为 是(1) 的数量
-                    List<InterlockDetail> list = detailService.selectListBySummaryId(summaryId);
-                    for (InterlockDetail detail:list) {
-                        if ("1".equals(detail.getInstrumentStatus())){
-                            ybcount++;   // 如果仪表状态有一个是异常 则+1;
-                        }
-                        if (Integer.parseInt(detail.getInputStatus()) == 1 || Integer.parseInt(detail.getOutputStatus()) == 1 || Integer.parseInt(detail.getMpStatus()) == 1) {
-                            kzxtcount++; // 如果控制系统状态有一个是异常 则+1;
-                        }
-                        if ("1".equals(detail.getIfBypass()) && "1".equals(detail.getBypass())){
-                            plcount++;  // 如果旁路有一个为 是 则+1;
-                        }
-                    }
-
-                    // 根据总表id 查询对应的联锁
-                    QueryWrapper<InterlockSummary> summaryQuery = new QueryWrapper<>();
-                    summaryQuery.eq("id",summaryId);
-                    InterlockSummary interLock = summaryService.getOne(summaryQuery);
-
-                    // 联锁状态     (总表)
-                    String lsStatus = "1";   // 0:未投用 1:投用
-                    // 暂时不需要根据if_bypass进行判断,默认否的时候 手动输入值为“无旁路”  如果 if_bypass = 1 则根据点位进行判断
-                    if (plcount > 0){
-                        lsStatus = "0";   // 如果旁路有一个为 是 则联锁状态为 "未投用"
-                    }
-
-                    // todo 判断 联锁状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
-                    String ifLsStatus = "1"; // 判断联锁状态较之前是否发生了变化 0:未发生变化 1:变化
-                    if (ObjectUtil.isNotNull(interLock.getInterlockStatus()) && lsStatus.equals(interLock.getInterlockStatus())){
-                        ifLsStatus = "0";
-                    }
-
-                    // 回路健康级别  (总表)
-                    String hljkjbStatus = "A"; // 四级:A、B、C、D
-                    if (ybcount > 0){
-                        zdybzt = "1";   // 如果 该联锁下所有联锁条件的 仪表状态为异常的数量大于0 则 总的仪表状态为 异常
-                    }
-                    if (kzxtcount > 0){
-                        zdkzxtzt = "1"; // 如果 该联锁下所有联锁条件的 控制系统为异常的数量大于0 则 总的控制系统状态为 异常
-                    }
-                    ArrayList<String> sList = new ArrayList<>();
-                    sList.add(zdybzt);     // 仪表状态
-                    if("1".equals(lsStatus)){
-                        sList.add("0");  // 暂时认为  联锁状态 1投用 为 正常
-                    }else {
-                        sList.add("1");
-                    }
-                    sList.add(zdkzxtzt);   // 控制系统状态
-                    // 集合中 元素为 1 的数量
-                    long count = sList.stream().filter(e -> e.equals("1")).count();
-                    if (count == 0){
-                        hljkjbStatus = "A";   // 三个状态都正常 则回路健康级别 为 A
-                    }else if (count == 1){
-                        hljkjbStatus = "B";   // 有任何一个状态为异常 则回路健康级别为 B
-                    }else if (count == 2){
-                        hljkjbStatus = "C";   // 有任何两个状态为异常 则回路健康级别为 C
-                    }else {
-                        hljkjbStatus = "D";   // 三个状态都是异常 则回路健康级别为 D
-                    }
-
-                    // todo 判断 回路健康级别 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
-                    String ifHljkjbStatus = "1"; // 判断回路健康级别较之前是否发生了变化 0:未发生变化 1:变化
-                    if (ObjectUtil.isNotNull(interLock.getLoopHealthLevel()) && hljkjbStatus.equals(interLock.getLoopHealthLevel())){
-                        ifHljkjbStatus = "0";
-                    }
-
-                    // 只要有任意一个状态发生了变化 就将当前联锁、联锁条件记录到历史数据中  (可此时记录的状态确实是未改变之前的,但是点位值却是已经改变之后的)
-                    //if ("1".equals(ifYbStatus) || "1".equals(ifLsStatus) || "1".equals(ifKzxtStatus) || "1".equals(ifHljkjbStatus)){
-                    //    // 需要判断一下 该时间(点位时间)是否已经有了历史记录了 (有可能 多个联锁条件的相应的状态都发生了变化,只存一次即可)
-                    //    QueryWrapper<InterlockSummaryHistory> summaryHistoryQuery = new QueryWrapper<>();
-                    //    summaryHistoryQuery.eq("tag_time",array[52]);
-                    //    long count1 = summaryHistoryService.count(summaryHistoryQuery);
-                    //    if (count1 == 0){
-                    //        InterlockSummaryHistory interlockSummaryHistory = InterlockSummaryHistoryConvert.INSTANCE.toHistory(interLock);
-                    //        summaryHistoryService.save(interlockSummaryHistory);
-                    //        ArrayList<InterlockDetailHistory> historyList = new ArrayList<>();
-                    //        for (InterlockDetail item:list) {
-                    //            item.setSummaryid(interlockSummaryHistory.getId());
-                    //            historyList.add(InterlockDetailHistoryConvert.INSTANCE.toHistory(item));
-                    //        }
-                    //        detailHistoryService.saveBatch(historyList);
-                    //    }
-                    //}
-
-
-                    // 修改明细表中 该条联锁条件的 仪表状态、控制系统状态
-                    UpdateWrapper<InterlockDetail> upDetailQuery1 = new UpdateWrapper<>();
-                    upDetailQuery1.set("instrument_status",ybStatus).set("control_system_status",kzxtStatus);
-                    upDetailQuery1.eq("id",array[0]);
-                    detailService.update(upDetailQuery1);
-
-
-                    // 修改记录上一次值的表中每个点位值+状态
-                    //UpdateWrapper<InterlockDetail> upDetailQuery2 = new UpdateWrapper<>();
-                    //upDetailQuery2.set("control_system_status",kzxtStatus);
-                    //upDetailQuery2.eq("id",array[0]);
-                    //detailService.update(upDetailQuery2);
-
-                    // 修改总表中的联锁状态、回路健康级别
-                    UpdateWrapper<InterlockSummary> upSummaryQuery1 = new UpdateWrapper<>();
-                    upSummaryQuery1.set("interlock_status",lsStatus).set("loop_health_level",hljkjbStatus);
-                    upSummaryQuery1.eq("id",summaryId);
-                    summaryService.update(upSummaryQuery1);
-
-                    // 修改记录上一次值的表中每个点位值+状态
-                    //UpdateWrapper<InterlockSummary> upSummaryQuery2 = new UpdateWrapper<>();
-                    //upSummaryQuery2.set("loop_health_level",hljkjbStatus);
-                    //upSummaryQuery2.eq("id",summaryId);
-                    //summaryService.update(upSummaryQuery2);
-
-                }
-            }
-        });
-        try {
-            client.connect();
-        } catch (
-                IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-}
+//package org.jeecg.modules.binlog;
+//
+//import cn.hutool.core.util.ObjectUtil;
+//import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+//import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+//import com.github.shyiko.mysql.binlog.BinaryLogClient;
+//import com.github.shyiko.mysql.binlog.event.EventData;
+//import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+//import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+//import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.commons.lang3.ObjectUtils;
+//import org.jeecg.common.util.DateUtils;
+//import org.jeecg.modules.detail.entity.InterlockDetail;
+//import org.jeecg.modules.detail.service.IInterlockDetailService;
+//import org.jeecg.modules.history.convert.InterlockDetailHistoryConvert;
+//import org.jeecg.modules.history.convert.InterlockSummaryHistoryConvert;
+//import org.jeecg.modules.history.entity.InterlockDetailHistory;
+//import org.jeecg.modules.history.entity.InterlockSummaryHistory;
+//import org.jeecg.modules.history.service.IInterlockDetailHistoryService;
+//import org.jeecg.modules.history.service.IInterlockSummaryHistoryService;
+//import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
+//import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
+//import org.jeecg.modules.summary.entity.InterlockSummary;
+//import org.jeecg.modules.summary.service.IInterlockSummaryService;
+//import org.jeecg.modules.tag.entity.InterlockTag;
+//import org.jeecg.modules.tag.service.IInterlockTagService;
+//import org.jeecg.modules.temp.entity.InterlockDetailTemp;
+//import org.jeecg.modules.temp.entity.InterlockSummaryTemp;
+//import org.jeecg.modules.temp.service.IInterlockDetailTempService;
+//import org.jeecg.modules.temp.service.IInterlockSummaryTempService;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.boot.ApplicationArguments;
+//import org.springframework.boot.ApplicationRunner;
+//import org.springframework.core.annotation.Order;
+//import org.springframework.data.redis.core.BoundHashOperations;
+//import org.springframework.data.redis.core.HashOperations;
+//import org.springframework.data.redis.core.RedisTemplate;
+//import org.springframework.scheduling.annotation.Async;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.Resource;
+//import java.io.IOException;
+//import java.math.BigDecimal;
+//import java.text.DecimalFormat;
+//import java.time.Duration;
+//import java.time.LocalDateTime;
+//import java.time.format.DateTimeFormatter;
+//import java.util.*;
+//import java.util.concurrent.CompletableFuture;
+//import java.util.concurrent.TimeUnit;
+//
+///**
+// * @author dzc
+// * @date 2024/5/30 9:57
+// * @package org.jeecg.modules.binlog
+// * @project interlock_server
+// * @des  binlog监控 需要在 my.ini文件中进行配置    由于数据库更换为PostgreSQL,所以MySQL的binlog不起作用。
+// */
+//
+////此类可以监控MySQL库数据的增删改
+//@Order
+//@Component
+//@Slf4j  //用于打印日志
+////在SpringBoot中,提供了一个接口:ApplicationRunner。
+////该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。
+//public class MysqlBinLogClient implements ApplicationRunner {
+//
+//
+//
+//    @Value("${binarylog.host}")
+//    private String host;
+//    @Value("${binarylog.port}")
+//    private Integer port;
+//    @Value("${binarylog.schema}")
+//    private String schema;
+//    @Value("${binarylog.username}")
+//    private String username;
+//    @Value("${binarylog.password}")
+//    private String password;
+//
+//    private Long interlockdetailid = 1L; // 联锁详细信息
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IInterlockDetailService detailService;
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IInterlockSummaryService summaryService;
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IIotedgeCollectDataService iotedgeCollectDataService;
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IInterlockTagService tagService;
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IInterlockSummaryHistoryService summaryHistoryService;
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IInterlockDetailHistoryService detailHistoryService;
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IInterlockDetailTempService detailTempService;
+//
+//    @Autowired
+//    @SuppressWarnings("all")
+//    private IInterlockSummaryTempService summaryTempService;
+//
+//    @Async
+//    @Override
+//    @Order
+//    public void run(ApplicationArguments args) throws Exception {
+//        //项目启动完成连接bin-log
+//        new Thread(() -> {
+//            connectMysqlBinLog();
+//        }).start();
+//    }
+//
+//    /**
+//     * 连接mysqlBinLog
+//     */
+//    public void connectMysqlBinLog() {
+//        log.info("监控BinLog服务已启动");
+//
+//        //自己MySQL的信息。host,port,username,password
+//        BinaryLogClient client = new BinaryLogClient(host, port, schema, username, password);
+//        // 设置连接时间为20秒
+//        client.setConnectTimeout(20 * 1000);
+//
+//        /**因为binlog不是以数据库为单位划分的,所以监控binglog不是监控的单个的数据库,而是整个     当前所设置连接的MySQL,
+//         *其中任何一个库发生数据增删改,这里都能检测到,
+//         *所以不用设置所监控的数据库的名字(我也不知道怎么设置,没发现有包含这个形参的构造函数)
+//         *如果需要只监控指定的数据库,可以看后面代码,可以获取到当前发生变更的数据库名称。可以根据名称来决定是否监控
+//         **/
+//
+//        client.setServerId(10); //和自己之前设置的server-id保持一致,但是我不知道为什么不一致也能成功
+//        //下面直接照抄就行
+//        client.registerEventListener(event -> {
+//            EventData data = event.getData();
+//            if (data instanceof TableMapEventData) {
+//                //只要连接的MySQL发生的增删改的操作,则都会进入这里,无论哪个数据库
+//                TableMapEventData tableMapEventData = (TableMapEventData) data;
+//                if (schema.equals(tableMapEventData.getDatabase())) {
+//                    if ("interlock_detail".equals(tableMapEventData.getTable())) {
+//                        interlockdetailid = tableMapEventData.getTableId();
+//                    }
+//                }
+//            }
+//
+//            if (data instanceof UpdateRowsEventData) {
+//                long tableId = ((UpdateRowsEventData) data).getTableId();
+//                if (interlockdetailid == tableId){
+//                    System.out.println("表中数据发生了更改"+data.toString());
+//                    Object[] array = Arrays.stream(((UpdateRowsEventData) data).getRows().get(0).getValue()).toArray();
+//
+//                    String interlockCondition = (String) array[3];  // 联锁条件值
+//                    String curronValue = (String) array[25];     // 当前值
+//                    String ybStatusTag = (String) array[13];     // 仪表状态点位 值
+//                    String ybValueTag = (String) array[17];      // 仪表原始模拟量点位 值
+//                    String plTag = (String) array[30];           // 旁路点位 值
+//                    String inputStatusTag = (String) array[34];  // 输入卡件状态点位 值
+//                    String outputStatusTag = (String) array[38]; // 输出卡件状态点位 值
+//                    String mpStatusTag = (String) array[42];     // MP状态点位 值
+//                    String summaryId = (String) array[1];  // 联锁总表ID
+//                    String id = (String) array[0];  // 联锁条件ID
+//                    String ybIfFs = (String) array[9];   // 仪表状态判断方式
+//                    String upperLimit = ""; // 上限
+//                    if (ObjectUtil.isNotNull(array[18])) {
+//                        upperLimit = (String)array[18];
+//                    }
+//                    String lowerLimit = ""; // 下限
+//                    if (ObjectUtil.isNotNull(array[19])){
+//                        lowerLimit = (String)array[19];
+//                    }
+//                    String ifBypass = (String) array[29];  // 是否旁路  (若 "是" 则 bypass 等于 点位值;若 "否" 则 bypass 等于 输入值)
+//
+//                    String tagTime = (String) array[52];   // 点位时间
+//
+//                    // 仪表状态     (明细表)
+//                    String ybStatus = "0"; // 0:正常 1:故障
+//
+//                    // 直接读取位号
+//                    if ("0".equals(ybIfFs) && ObjectUtil.isNotNull(ybStatusTag)){
+//                        // 仪表状态 等于点位值
+//                        ybStatus = ybStatusTag;
+//                    }
+//
+//                    // 高低限判断
+//                    if ("1".equals(ybIfFs)){
+//                        // 如果 仪表值 大于上限 或者 小于下限 则状态为 异常;两种情况不会同时满足
+//                        if (Integer.parseInt(ybValueTag) >  Integer.parseInt(upperLimit) || Integer.parseInt(ybValueTag) < Integer.parseInt(lowerLimit)){
+//                            ybStatus = "1";
+//                        }
+//                    }
+//
+//                    // 突变超限判断  规定时间内 (结束值-初始值)/初始值 * 100 百分比 是否大于 规定的阈值
+//                    if ("2".equals(ybIfFs)){
+//                        // 如果 采集的频率 大于 规定的时间
+//                        // 例如 采集是一分钟采集一次 规定的时间的2s之内
+//                        // 如果 采集的频率 小于等于 规定的时间
+//                        // 例如 2S之内  去采集数据表中根据 设备id+模块名称+点位名称 查询2s之前的数据,但是表中的数据量大 时间?
+//                        String yz = (String) array[20];  // 阈值
+//                        String time = (String) array[21];  // 规定的时间
+//                        String dw = (String) array[22];   // 时间单位
+//                        if ("s".equals(dw)){
+//                            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+//                            String endDate = DateUtils.getDate("yyyy-MM-dd HH:mm:ss");  // 当前时间 规定时间的结束时间
+//                            LocalDateTime endDateTime = LocalDateTime.parse(endDate, formatter);
+//                            LocalDateTime beginDateTime = endDateTime.minus(Duration.ofSeconds(2));
+//                            String beginDate = beginDateTime.format(formatter);  // 开始时间 规定时间的开始时间
+//                            QueryWrapper<InterlockTag> tagQuery = new QueryWrapper<>();
+//                            tagQuery.eq("interlock_condition_id",id).eq("parameter_type","8");
+//                            InterlockTag interlockTag = tagService.getOne(tagQuery);
+//                            IotedgeCollectData iotedgeData = iotedgeCollectDataService.getOneInfo(interlockTag.getDeviceId(),interlockTag.getModuleName(),interlockTag.getTagName(),beginDate);
+//                            BigDecimal beginValue = BigDecimal.valueOf(Integer.parseInt(iotedgeData.getValue()));
+//                            BigDecimal num = BigDecimal.valueOf(Integer.parseInt(ybValueTag)).subtract(beginValue);
+//                            if ((num.divide(beginValue).compareTo(BigDecimal.valueOf(Integer.parseInt(yz)))) > 0){
+//                                ybStatus = "1";
+//                            }
+//                        }
+//                    }
+//
+//                    // todo 判断 仪表状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
+//                    String ifYbStatus = "1";  // 判断仪表状态较之前是否发生了变化 0:未发生变化 1:变化
+//                    String currYbStatus = (String) array[8];
+//                    if (ObjectUtil.isNotNull(currYbStatus) && ybStatus.equals(currYbStatus)){
+//                        ifYbStatus = "0";
+//                    }
+//
+//
+//                    // 控制系统状态  (明细表)
+//                    String kzxtStatus = "0"; // 0:正常 1:非正常
+//                    if (Integer.parseInt(inputStatusTag) == 1 || Integer.parseInt(outputStatusTag) == 1 || Integer.parseInt(mpStatusTag) == 1){
+//                        kzxtStatus = "1";
+//                    }
+//
+//                    // todo 判断 控制系统状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
+//                    String ifKzxtStatus = "1"; // 判断控制系统状态较之前是否发生了变化 0:未发生变化 1:变化
+//                    String currKzxtStatus = (String) array[23];
+//                    if (ObjectUtil.isNotNull(currKzxtStatus) && kzxtStatus.equals(currKzxtStatus)){
+//                        ifKzxtStatus = "0";
+//                    }
+//
+//
+//                    // 根据总表id 查询数所有的联锁条件
+//                    String zdybzt = "0";   // 该联锁下 所有联锁条件的 (总)仪表状态
+//                    String zdkzxtzt = "0"; // 该联锁下 所有联塑条件的 (总)控制系统状态
+//                    int ybcount = 0;    // 该联锁下 所有联锁条件 仪表状态为 异常(1) 的数量
+//                    int kzxtcount = 0;  // 该联锁下 所有联锁条件 控制系统状态为 异常(1) 的数量
+//                    int plcount = 0;    // 该联锁下 所有联锁条件 旁路为 是(1) 的数量
+//                    List<InterlockDetail> list = detailService.selectListBySummaryId(summaryId);
+//                    for (InterlockDetail detail:list) {
+//                        if ("1".equals(detail.getInstrumentStatus())){
+//                            ybcount++;   // 如果仪表状态有一个是异常 则+1;
+//                        }
+//                        if (Integer.parseInt(detail.getInputStatus()) == 1 || Integer.parseInt(detail.getOutputStatus()) == 1 || Integer.parseInt(detail.getMpStatus()) == 1) {
+//                            kzxtcount++; // 如果控制系统状态有一个是异常 则+1;
+//                        }
+//                        if ("1".equals(detail.getIfBypass()) && "1".equals(detail.getBypass())){
+//                            plcount++;  // 如果旁路有一个为 是 则+1;
+//                        }
+//                    }
+//
+//                    // 根据总表id 查询对应的联锁
+//                    QueryWrapper<InterlockSummary> summaryQuery = new QueryWrapper<>();
+//                    summaryQuery.eq("id",summaryId);
+//                    InterlockSummary interLock = summaryService.getOne(summaryQuery);
+//
+//                    // 联锁状态     (总表)
+//                    String lsStatus = "1";   // 0:未投用 1:投用
+//                    // 暂时不需要根据if_bypass进行判断,默认否的时候 手动输入值为“无旁路”  如果 if_bypass = 1 则根据点位进行判断
+//                    if (plcount > 0){
+//                        lsStatus = "0";   // 如果旁路有一个为 是 则联锁状态为 "未投用"
+//                    }
+//
+//                    // todo 判断 联锁状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
+//                    String ifLsStatus = "1"; // 判断联锁状态较之前是否发生了变化 0:未发生变化 1:变化
+//                    if (ObjectUtil.isNotNull(interLock.getInterlockStatus()) && lsStatus.equals(interLock.getInterlockStatus())){
+//                        ifLsStatus = "0";
+//                    }
+//
+//                    // 回路健康级别  (总表)
+//                    String hljkjbStatus = "A"; // 四级:A、B、C、D
+//                    if (ybcount > 0){
+//                        zdybzt = "1";   // 如果 该联锁下所有联锁条件的 仪表状态为异常的数量大于0 则 总的仪表状态为 异常
+//                    }
+//                    if (kzxtcount > 0){
+//                        zdkzxtzt = "1"; // 如果 该联锁下所有联锁条件的 控制系统为异常的数量大于0 则 总的控制系统状态为 异常
+//                    }
+//                    ArrayList<String> sList = new ArrayList<>();
+//                    sList.add(zdybzt);     // 仪表状态
+//                    if("1".equals(lsStatus)){
+//                        sList.add("0");  // 暂时认为  联锁状态 1投用 为 正常
+//                    }else {
+//                        sList.add("1");
+//                    }
+//                    sList.add(zdkzxtzt);   // 控制系统状态
+//                    // 集合中 元素为 1 的数量
+//                    long count = sList.stream().filter(e -> e.equals("1")).count();
+//                    if (count == 0){
+//                        hljkjbStatus = "A";   // 三个状态都正常 则回路健康级别 为 A
+//                    }else if (count == 1){
+//                        hljkjbStatus = "B";   // 有任何一个状态为异常 则回路健康级别为 B
+//                    }else if (count == 2){
+//                        hljkjbStatus = "C";   // 有任何两个状态为异常 则回路健康级别为 C
+//                    }else {
+//                        hljkjbStatus = "D";   // 三个状态都是异常 则回路健康级别为 D
+//                    }
+//
+//                    // todo 判断 回路健康级别 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
+//                    String ifHljkjbStatus = "1"; // 判断回路健康级别较之前是否发生了变化 0:未发生变化 1:变化
+//                    if (ObjectUtil.isNotNull(interLock.getLoopHealthLevel()) && hljkjbStatus.equals(interLock.getLoopHealthLevel())){
+//                        ifHljkjbStatus = "0";
+//                    }
+//
+//                    // 只要有任意一个状态发生了变化 就将当前联锁、联锁条件记录到历史数据中  (可此时记录的状态确实是未改变之前的,但是点位值却是已经改变之后的)
+//                    if ("1".equals(ifYbStatus) || "1".equals(ifLsStatus) || "1".equals(ifKzxtStatus) || "1".equals(ifHljkjbStatus)){
+//                        // 需要判断一下 该时间(点位时间)是否已经有了历史记录了 (有可能 多个联锁条件的相应的状态都发生了变化,只存一次即可)
+//                        QueryWrapper<InterlockSummaryTemp> summaryTempQuery = new QueryWrapper<>();
+//                        summaryTempQuery.eq("id",summaryId);
+//                        InterlockSummaryTemp interlockTemp = summaryTempService.getOne(summaryTempQuery);
+//                        QueryWrapper<InterlockSummaryHistory> summaryHistoryQuery = new QueryWrapper<>();
+//                        summaryHistoryQuery.eq("tag_time",interlockTemp.getTagTime());
+//                        long count1 = summaryHistoryService.count(summaryHistoryQuery);
+//                        if (count1 == 0){
+//                            List<InterlockDetailTemp> tempList = detailTempService.selectListBySummaryId(summaryId);
+//                            InterlockSummaryHistory interlockSummaryHistory = InterlockSummaryHistoryConvert.INSTANCE.toTempHistory(interlockTemp);
+//                            summaryHistoryService.save(interlockSummaryHistory);
+//                            ArrayList<InterlockDetailHistory> historyList = new ArrayList<>();
+//                            for (InterlockDetailTemp item:tempList) {
+//                                item.setSummaryid(interlockSummaryHistory.getId());
+//                                historyList.add(InterlockDetailHistoryConvert.INSTANCE.toTempHistory(item));
+//                            }
+//                            detailHistoryService.saveBatch(historyList);
+//                        }
+//                    }
+//
+//
+//                    // 修改明细表中 该条联锁条件的 仪表状态、控制系统状态
+//                    UpdateWrapper<InterlockDetail> upDetailQuery = new UpdateWrapper<>();
+//                    upDetailQuery.set("instrument_status",ybStatus).set("control_system_status",kzxtStatus);
+//                    upDetailQuery.eq("id",array[0]);
+//                    detailService.update(upDetailQuery);
+//
+//
+//                    // 修改联锁临时表中每个点位值+状态
+//                    UpdateWrapper<InterlockDetailTemp> upTempDetailQuery = new UpdateWrapper<>();
+//                    upTempDetailQuery.set("instrument_status",ybStatus)
+//                            .set("control_system_status",kzxtStatus)
+//                            .set("interlock_condition",interlockCondition)
+//                            .set("instrument_status_value",ybStatusTag)
+//                            .set("ysmnl_value",ybValueTag)
+//                            .set("current_value",curronValue)
+//                            .set("bypass",plTag)
+//                            .set("input_status",inputStatusTag)
+//                            .set("output_status",outputStatusTag)
+//                            .set("mp_status",mpStatusTag)
+//                            .set("tag_time",tagTime);
+//                    upTempDetailQuery.eq("id",array[0]);
+//                    detailTempService.update(upTempDetailQuery);
+//
+//                    // 修改总表中的联锁状态、回路健康级别
+//                    UpdateWrapper<InterlockSummary> upSummaryQuery = new UpdateWrapper<>();
+//                    upSummaryQuery.set("interlock_status",lsStatus).set("loop_health_level",hljkjbStatus);
+//                    upSummaryQuery.eq("id",summaryId);
+//                    summaryService.update(upSummaryQuery);
+//
+//                    // 修改联锁临时表中每个点位值+状态
+//                    UpdateWrapper<InterlockSummaryTemp> upTempSummaryQuery = new UpdateWrapper<>();
+//                    upTempSummaryQuery.set("interlock_status",lsStatus)
+//                            .set("loop_health_level",hljkjbStatus)
+//                            .set("interlock_out_value",interLock.getInterlockOutValue())
+//                            .set("tag_time",interLock.getTagTime());
+//                    upTempSummaryQuery.eq("id",summaryId);
+//                    summaryTempService.update(upTempSummaryQuery);
+//
+//                }
+//            }
+//        });
+//        try {
+//            client.connect();
+//        } catch (
+//                IOException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//}

+ 373 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/binlog/PostgreSQLClient.java

@@ -0,0 +1,373 @@
+package org.jeecg.modules.binlog;
+
+import cn.hutool.core.util.ObjectUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.common.util.DateUtils;
+import org.jeecg.modules.detail.entity.InterlockDetail;
+import org.jeecg.modules.detail.service.IInterlockDetailService;
+import org.jeecg.modules.history.convert.InterlockDetailHistoryConvert;
+import org.jeecg.modules.history.convert.InterlockSummaryHistoryConvert;
+import org.jeecg.modules.history.entity.InterlockDetailHistory;
+import org.jeecg.modules.history.entity.InterlockSummaryHistory;
+import org.jeecg.modules.history.service.IInterlockDetailHistoryService;
+import org.jeecg.modules.history.service.IInterlockSummaryHistoryService;
+import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
+import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
+import org.jeecg.modules.summary.entity.InterlockSummary;
+import org.jeecg.modules.summary.service.IInterlockSummaryService;
+import org.jeecg.modules.tag.entity.InterlockTag;
+import org.jeecg.modules.tag.service.IInterlockTagService;
+import org.jeecg.modules.temp.entity.InterlockDetailTemp;
+import org.jeecg.modules.temp.entity.InterlockSummaryTemp;
+import org.jeecg.modules.temp.service.IInterlockDetailTempService;
+import org.jeecg.modules.temp.service.IInterlockSummaryTempService;
+import org.postgresql.PGConnection;
+import org.postgresql.PGNotification;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import java.math.BigDecimal;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.sql.*;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+
+/**
+ * @author dzc
+ * @date 2024/6/7 14:21
+ * @package org.jeecg.modules.binlog
+ * @project interlock_server
+ * @des PostgreSQL 数据库 ,实时处理数据
+ */
+@Order
+@Component
+@Slf4j  //用于打印日志
+public class PostgreSQLClient implements ApplicationRunner {
+
+
+    @Value("${postgresql.url}")
+    private String url;
+    @Value("${postgresql.username}")
+    private String username;
+    @Value("${postgresql.password}")
+    private String password;
+
+
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IInterlockDetailService detailService;
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IInterlockSummaryService summaryService;
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IIotedgeCollectDataService iotedgeCollectDataService;
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IInterlockTagService tagService;
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IInterlockSummaryHistoryService summaryHistoryService;
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IInterlockDetailHistoryService detailHistoryService;
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IInterlockDetailTempService detailTempService;
+
+    @Autowired
+    @SuppressWarnings("all")
+    private IInterlockSummaryTempService summaryTempService;
+
+
+
+    @Async
+    @Override
+    @Order
+    public void run(ApplicationArguments args) throws Exception {
+        //项目启动完成连接
+        new Thread(this::connectPostgreSQL).start();
+    }
+
+    private void connectPostgreSQL() {
+        log.info("数据库已连接");
+        try {
+            Connection connection = DriverManager.getConnection(url, username, password);
+            PGConnection pgConnection = connection.unwrap(PGConnection.class);
+            Statement statement = connection.createStatement();
+
+            // 触发器发送通知 通过 LISTEN 来监听(订阅)通知
+            statement.execute("LISTEN notify_event");
+
+            // 采用轮询的方式查询是否有新的通知产生,
+            // 暂时没有想到更好的方式替换掉 while 死循环。(可以采用postgresql数据库集成KafKa的方式,但是需要对数据库进行改动可能)
+            while (true){
+                PGNotification[] notifications = pgConnection.getNotifications();
+                if (notifications != null){
+                    for (PGNotification notification : notifications){
+                        String name = notification.getName();
+                        String parameter = notification.getParameter();
+                        System.out.println("变化的数据:"+name+"---"+parameter);
+                        Map<String, Object> changeDataMap = JSONObject.parseObject(parameter, new TypeReference<Map<String, Object>>() {
+                        });
+                        String interlockCondition = (String) changeDataMap.get("interlock_condition");  // 联锁条件值
+                        String curronValue = (String) changeDataMap.get("current_value");     // 当前值
+                        String ybStatusTag = (String) changeDataMap.get("instrument_status_value");     // 仪表状态点位 值
+                        String ybValueTag = (String) changeDataMap.get("ysmnl_value");      // 仪表原始模拟量点位 值
+                        String plTag = (String) changeDataMap.get("bypass");           // 旁路点位 值
+                        String inputStatusTag = (String) changeDataMap.get("input_status");  // 输入卡件状态点位 值
+                        String outputStatusTag = (String) changeDataMap.get("output_status"); // 输出卡件状态点位 值
+                        String mpStatusTag = (String) changeDataMap.get("mp_status");     // MP状态点位 值
+                        String summaryId = (String) changeDataMap.get("summaryid");  // 联锁总表ID
+                        String id = (String) changeDataMap.get("id");  // 联锁条件ID
+                        String ybIfFs = (String) changeDataMap.get("instrument_status_juge");   // 仪表状态判断方式
+                        String upperLimit = ""; // 上限
+                        if (ObjectUtil.isNotNull(changeDataMap.get("upper_limit"))) {
+                            upperLimit = (String)changeDataMap.get("upper_limit");
+                        }
+                        String lowerLimit = ""; // 下限
+                        if (ObjectUtil.isNotNull(changeDataMap.get("lower_limit"))){
+                            lowerLimit = (String)changeDataMap.get("lower_limit");
+                        }
+                        String ifBypass = (String) changeDataMap.get("if_bypass");  // 是否旁路  (若 "是" 则 bypass 等于 点位值;若 "否" 则 bypass 等于 输入值)
+
+                        String tagTime = (String) changeDataMap.get("tag_time");   // 点位时间
+
+                        // 仪表状态     (明细表)
+                        String ybStatus = "0"; // 0:正常 1:故障
+
+                        // 直接读取位号
+                        if ("0".equals(ybIfFs) && ObjectUtil.isNotNull(ybStatusTag)){
+                            // 仪表状态 等于点位值
+                            ybStatus = ybStatusTag;
+                        }
+
+                        // 高低限判断
+                        if ("1".equals(ybIfFs)){
+                            // 如果 仪表值 大于上限 或者 小于下限 则状态为 异常;两种情况不会同时满足
+                            if (Integer.parseInt(ybValueTag) >  Integer.parseInt(upperLimit) || Integer.parseInt(ybValueTag) < Integer.parseInt(lowerLimit)){
+                                ybStatus = "1";
+                            }
+                        }
+
+                        // 突变超限判断  规定时间内 (结束值-初始值)/初始值 * 100 百分比 是否大于 规定的阈值
+                        if ("2".equals(ybIfFs)){
+                            // 如果 采集的频率 大于 规定的时间
+                            // 例如 采集是一分钟采集一次 规定的时间的2s之内
+                            // 如果 采集的频率 小于等于 规定的时间
+                            // 例如 2S之内  去采集数据表中根据 设备id+模块名称+点位名称 查询2s之前的数据,但是表中的数据量大 时间?
+                            String yz = (String) changeDataMap.get("threshold_value");  // 阈值
+                            String time = (String) changeDataMap.get("threshold_time");  // 规定的时间
+                            String dw = (String) changeDataMap.get("threshold_time_unit");   // 时间单位
+                            if ("s".equals(dw)){
+                                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+                                String endDate = DateUtils.getDate("yyyy-MM-dd HH:mm:ss");  // 当前时间 规定时间的结束时间
+                                LocalDateTime endDateTime = LocalDateTime.parse(endDate, formatter);
+                                LocalDateTime beginDateTime = endDateTime.minus(Duration.ofSeconds(2));
+                                String beginDate = beginDateTime.format(formatter);  // 开始时间 规定时间的开始时间
+                                QueryWrapper<InterlockTag> tagQuery = new QueryWrapper<>();
+                                tagQuery.eq("interlock_condition_id",id).eq("parameter_type","8");
+                                InterlockTag interlockTag = tagService.getOne(tagQuery);
+                                IotedgeCollectData iotedgeData = iotedgeCollectDataService.getOneInfo(interlockTag.getDeviceId(),interlockTag.getModuleName(),interlockTag.getTagName(),beginDate);
+                                BigDecimal beginValue = BigDecimal.valueOf(Integer.parseInt(iotedgeData.getValue()));
+                                BigDecimal num = BigDecimal.valueOf(Integer.parseInt(ybValueTag)).subtract(beginValue);
+                                if ((num.divide(beginValue).compareTo(BigDecimal.valueOf(Integer.parseInt(yz)))) > 0){
+                                    ybStatus = "1";
+                                }
+                            }
+                        }
+
+                        // todo 判断 仪表状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
+                        String ifYbStatus = "1";  // 判断仪表状态较之前是否发生了变化 0:未发生变化 1:变化
+                        String currYbStatus = (String) changeDataMap.get("instrument_status");
+                        if (ObjectUtil.isNotNull(currYbStatus) && ybStatus.equals(currYbStatus)){
+                            ifYbStatus = "0";
+                        }
+
+                        // 控制系统状态  (明细表)
+                        String kzxtStatus = "0"; // 0:正常 1:非正常
+                        if (Integer.parseInt(inputStatusTag) == 1 || Integer.parseInt(outputStatusTag) == 1 || Integer.parseInt(mpStatusTag) == 1){
+                            kzxtStatus = "1";
+                        }
+
+                        // todo 判断 控制系统状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
+                        String ifKzxtStatus = "1"; // 判断控制系统状态较之前是否发生了变化 0:未发生变化 1:变化
+                        String currKzxtStatus = (String) changeDataMap.get("control_system_status");
+                        if (ObjectUtil.isNotNull(currKzxtStatus) && kzxtStatus.equals(currKzxtStatus)){
+                            ifKzxtStatus = "0";
+                        }
+
+                        // 修改明细表中 该条联锁条件的 仪表状态、控制系统状态
+                        UpdateWrapper<InterlockDetail> upDetailQuery = new UpdateWrapper<>();
+                        upDetailQuery.set("instrument_status",ybStatus).set("control_system_status",kzxtStatus);
+                        upDetailQuery.eq("id",id);
+                        detailService.update(upDetailQuery);
+
+                        // 根据总表id 查询数所有的联锁条件
+                        String zdybzt = "0";   // 该联锁下 所有联锁条件的 (总)仪表状态
+                        String zdkzxtzt = "0"; // 该联锁下 所有联塑条件的 (总)控制系统状态
+                        int ybcount = 0;    // 该联锁下 所有联锁条件 仪表状态为 异常(1) 的数量
+                        int kzxtcount = 0;  // 该联锁下 所有联锁条件 控制系统状态为 异常(1) 的数量
+                        int plcount = 0;    // 该联锁下 所有联锁条件 旁路为 是(1) 的数量
+                        List<InterlockDetail> list = detailService.selectListBySummaryId(summaryId);
+                        for (InterlockDetail detail:list) {
+                            if ("1".equals(detail.getInstrumentStatus())){
+                                ybcount++;   // 如果仪表状态有一个是异常 则+1;
+                            }
+                            if (Integer.parseInt(detail.getInputStatus()) == 1 || Integer.parseInt(detail.getOutputStatus()) == 1 || Integer.parseInt(detail.getMpStatus()) == 1) {
+                                kzxtcount++; // 如果控制系统状态有一个是异常 则+1;
+                            }
+                            if ("1".equals(detail.getBypass())){
+                                plcount++;  // 如果旁路有一个为 是 则+1;
+                            }
+                        }
+
+                        // 根据总表id 查询对应的联锁
+                        QueryWrapper<InterlockSummary> summaryQuery = new QueryWrapper<>();
+                        summaryQuery.eq("id",summaryId);
+                        InterlockSummary interLock = summaryService.getOne(summaryQuery);
+
+                        // 联锁状态     (总表)
+                        String lsStatus = "1";   // 0:未投用 1:投用
+                        if (plcount > 0){
+                            lsStatus = "0";   // 如果旁路有一个为 是 则联锁状态为 "未投用"
+                        }
+
+                        // todo 判断 联锁状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
+                        String ifLsStatus = "1"; // 判断联锁状态较之前是否发生了变化 0:未发生变化 1:变化
+                        if (ObjectUtil.isNotNull(interLock.getInterlockStatus()) && lsStatus.equals(interLock.getInterlockStatus())){
+                            ifLsStatus = "0";
+                        }
+
+                        // 回路健康级别  (总表)
+                        String hljkjbStatus = "A"; // 四级:A、B、C、D
+                        if (ybcount > 0){
+                            zdybzt = "1";   // 如果 该联锁下所有联锁条件的 仪表状态为异常的数量大于0 则 总的仪表状态为 异常
+                        }
+                        if (kzxtcount > 0){
+                            zdkzxtzt = "1"; // 如果 该联锁下所有联锁条件的 控制系统为异常的数量大于0 则 总的控制系统状态为 异常
+                        }
+                        ArrayList<String> sList = new ArrayList<>();
+                        sList.add(zdybzt);     // 仪表状态
+                        if("1".equals(lsStatus)){
+                            sList.add("0");  // 暂时认为  联锁状态 1投用 为 正常
+                        }else {
+                            sList.add("1");
+                        }
+                        sList.add(zdkzxtzt);   // 控制系统状态
+                        // 集合中 元素为 1 的数量
+                        long count = sList.stream().filter(e -> e.equals("1")).count();
+                        if (count == 0){
+                            hljkjbStatus = "A";   // 三个状态都正常 则回路健康级别 为 A
+                        }else if (count == 1){
+                            hljkjbStatus = "B";   // 有任何一个状态为异常 则回路健康级别为 B
+                        }else if (count == 2){
+                            hljkjbStatus = "C";   // 有任何两个状态为异常 则回路健康级别为 C
+                        }else {
+                            hljkjbStatus = "D";   // 三个状态都是异常 则回路健康级别为 D
+                        }
+
+                        // todo 判断 回路健康级别 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
+                        String ifHljkjbStatus = "1"; // 判断回路健康级别较之前是否发生了变化 0:未发生变化 1:变化
+                        if (ObjectUtil.isNotNull(interLock.getLoopHealthLevel()) && hljkjbStatus.equals(interLock.getLoopHealthLevel())){
+                            ifHljkjbStatus = "0";
+                        }
+
+                        // todo 判断 旁路状态 是否发生了改变,如果发生了变化就将变化之前的数据存入到历史数据表中
+                        String ifPlStatus = "1"; // 判断旁路状态是否发生了变化 0:未发生变化 1:变化
+                        QueryWrapper<InterlockDetailTemp> query = new QueryWrapper<>();
+                        query.eq("id",id);
+                        InterlockDetailTemp detailTemp = detailTempService.getOne(query);
+                        String bypass = detailTemp.getBypass(); // 上一次的旁路状态
+                        if (ObjectUtil.isNotNull(bypass) && plTag.equals(bypass)){
+                            ifPlStatus = "0";
+                        }
+
+                        // 修改总表中的联锁状态、回路健康级别
+                        UpdateWrapper<InterlockSummary> upSummaryQuery = new UpdateWrapper<>();
+                        upSummaryQuery.set("interlock_status",lsStatus).set("loop_health_level",hljkjbStatus);
+                        upSummaryQuery.eq("id",summaryId);
+                        summaryService.update(upSummaryQuery);
+
+                        // 只要有任意一个状态发生了变化 就将当前联锁、联锁条件记录到历史数据中  (可此时记录的状态确实是未改变之前的,但是点位值却是已经改变之后的)
+                        if ("1".equals(ifYbStatus) || "1".equals(ifLsStatus) || "1".equals(ifKzxtStatus) || "1".equals(ifHljkjbStatus) || "1".equals(ifPlStatus)){
+                            // 需要判断一下 该时间(点位时间)是否已经有了历史记录了 (有可能 多个联锁条件的相应的状态都发生了变化,只存一次即可)
+                            QueryWrapper<InterlockSummaryTemp> summaryTempQuery = new QueryWrapper<>();
+                            summaryTempQuery.eq("id",summaryId);
+                            InterlockSummaryTemp interlockTemp = summaryTempService.getOne(summaryTempQuery);
+                            QueryWrapper<InterlockSummaryHistory> summaryHistoryQuery = new QueryWrapper<>();
+                            summaryHistoryQuery.eq("tag_time",interlockTemp.getTagTime());
+                            long count1 = summaryHistoryService.count(summaryHistoryQuery);
+                            if (count1 == 0){
+                                List<InterlockDetailTemp> tempList = detailTempService.selectListBySummaryId(summaryId);
+                                InterlockSummaryHistory interlockSummaryHistory = InterlockSummaryHistoryConvert.INSTANCE.toTempHistory(interlockTemp);
+                                summaryHistoryService.save(interlockSummaryHistory);
+                                ArrayList<InterlockDetailHistory> historyList = new ArrayList<>();
+                                for (InterlockDetailTemp item:tempList) {
+                                    item.setSummaryid(interlockSummaryHistory.getId());
+                                    historyList.add(InterlockDetailHistoryConvert.INSTANCE.toTempHistory(item));
+                                }
+                                detailHistoryService.saveBatch(historyList);
+                            }
+                        }
+
+                        // 修改联锁临时表中每个点位值+状态
+                        UpdateWrapper<InterlockDetailTemp> upTempDetailQuery = new UpdateWrapper<>();
+                        upTempDetailQuery.set("instrument_status",ybStatus)
+                                .set("control_system_status",kzxtStatus)
+                                .set("interlock_condition",interlockCondition)
+                                .set("instrument_status_value",ybStatusTag)
+                                .set("ysmnl_value",ybValueTag)
+                                .set("current_value",curronValue)
+                                .set("bypass",plTag)
+                                .set("input_status",inputStatusTag)
+                                .set("output_status",outputStatusTag)
+                                .set("mp_status",mpStatusTag)
+                                .set("tag_time",tagTime);
+                        upTempDetailQuery.eq("id",id);
+                        detailTempService.update(upTempDetailQuery);
+
+                        // 修改联锁临时表中每个点位值+状态
+                        UpdateWrapper<InterlockSummaryTemp> upTempSummaryQuery = new UpdateWrapper<>();
+                        upTempSummaryQuery.set("interlock_status",lsStatus)
+                                .set("loop_health_level",hljkjbStatus)
+                                .set("interlock_out_value",interLock.getInterlockOutValue())
+                                .set("tag_time",interLock.getTagTime());
+                        upTempSummaryQuery.eq("id",summaryId);
+                        summaryTempService.update(upTempSummaryQuery);
+                    }
+                }
+
+                // TODO 不采用轮询的方式查询是否有新的通知产生
+                // 为了避免轮询的过于频繁 调用sleep方法 但是如果采集频率1s的话,调用sleep方法会对数据产生影响
+                Thread.sleep(1000);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+
+    }
+
+
+}

+ 4 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/history/convert/InterlockDetailHistoryConvert.java

@@ -3,6 +3,7 @@ package org.jeecg.modules.history.convert;
 import org.jeecg.modules.detail.convert.InterlockDetailConvert;
 import org.jeecg.modules.detail.entity.InterlockDetail;
 import org.jeecg.modules.history.entity.InterlockDetailHistory;
+import org.jeecg.modules.temp.entity.InterlockDetailTemp;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
 import org.mapstruct.factory.Mappers;
@@ -22,4 +23,7 @@ public interface InterlockDetailHistoryConvert {
     @Mapping(target = "id", ignore = true)
     InterlockDetailHistory toHistory(InterlockDetail interlockDetail);
 
+    @Mapping(target = "id", ignore = true)
+    InterlockDetailHistory toTempHistory(InterlockDetailTemp interlockDetail);
+
 }

+ 4 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/history/convert/InterlockSummaryHistoryConvert.java

@@ -5,6 +5,7 @@ import org.jeecg.modules.history.dto.InterlockHistoryQueryDTO;
 import org.jeecg.modules.history.entity.InterlockDetailHistory;
 import org.jeecg.modules.history.entity.InterlockSummaryHistory;
 import org.jeecg.modules.summary.entity.InterlockSummary;
+import org.jeecg.modules.temp.entity.InterlockSummaryTemp;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
 import org.mapstruct.factory.Mappers;
@@ -30,4 +31,7 @@ public interface InterlockSummaryHistoryConvert {
     @Mapping(target = "loopHealthLevel", source = "loopHealthLevel")
     InterlockHistoryQueryDTO toLoopHealthLevel(InterlockHistoryQueryDTO dto,String loopHealthLevel);
 
+    @Mapping(target = "id", ignore = true)
+    InterlockSummaryHistory toTempHistory(InterlockSummaryTemp interlockSummary);
+
 }

+ 1 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/temp/mapper/InterlockDetailTempMapper.java

@@ -14,4 +14,5 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  */
 public interface InterlockDetailTempMapper extends BaseMapper<InterlockDetailTemp> {
 
+    List<InterlockDetailTemp> selectListBySummaryId(@Param("summaryId") String summaryId);
 }

+ 3 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/temp/mapper/xml/InterlockDetailTempMapper.xml

@@ -2,4 +2,7 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.jeecg.modules.temp.mapper.InterlockDetailTempMapper">
 
+    <select id="selectListBySummaryId" resultType="org.jeecg.modules.temp.entity.InterlockDetailTemp">
+        select * from interlock_detail_temp where summaryid = #{summaryId}
+    </select>
 </mapper>

+ 4 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/temp/service/IInterlockDetailTempService.java

@@ -3,6 +3,8 @@ package org.jeecg.modules.temp.service;
 import org.jeecg.modules.temp.entity.InterlockDetailTemp;
 import com.baomidou.mybatisplus.extension.service.IService;
 
+import java.util.List;
+
 /**
  * @Description: interlock_detail_temp
  * @Author: jeecg-boot
@@ -11,4 +13,6 @@ import com.baomidou.mybatisplus.extension.service.IService;
  */
 public interface IInterlockDetailTempService extends IService<InterlockDetailTemp> {
 
+    List<InterlockDetailTemp> selectListBySummaryId(String summaryId);
+
 }

+ 11 - 0
jeecg-module-interlock/src/main/java/org/jeecg/modules/temp/service/impl/InterlockDetailTempServiceImpl.java

@@ -3,10 +3,13 @@ package org.jeecg.modules.temp.service.impl;
 import org.jeecg.modules.temp.entity.InterlockDetailTemp;
 import org.jeecg.modules.temp.mapper.InterlockDetailTempMapper;
 import org.jeecg.modules.temp.service.IInterlockDetailTempService;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 
+import java.util.List;
+
 /**
  * @Description: interlock_detail_temp
  * @Author: jeecg-boot
@@ -16,4 +19,12 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 @Service
 public class InterlockDetailTempServiceImpl extends ServiceImpl<InterlockDetailTempMapper, InterlockDetailTemp> implements IInterlockDetailTempService {
 
+    @Autowired
+    @SuppressWarnings("all")
+    private InterlockDetailTempMapper mapper;
+
+    @Override
+    public List<InterlockDetailTemp> selectListBySummaryId(String summaryId) {
+        return mapper.selectListBySummaryId(summaryId);
+    }
 }

+ 5 - 0
jeecg-module-system/jeecg-system-start/src/main/resources/application-dev.yml

@@ -346,3 +346,8 @@ binarylog:
   schema: interlock
   username: root
   password: 302201
+
+postgresql:
+  url: jdbc:postgresql://119.3.168.55:5432/interlock
+  username: postgres
+  password: 302201