|
@@ -0,0 +1,363 @@
|
|
|
+package org.jeecg.modules.binlog;
|
|
|
+
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
|
|
|
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
|
|
|
+import com.github.shyiko.mysql.binlog.event.EventData;
|
|
|
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
|
|
|
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
|
|
|
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.ObjectUtils;
|
|
|
+import org.jeecg.common.util.DateUtils;
|
|
|
+import org.jeecg.modules.detail.entity.InterlockDetail;
|
|
|
+import org.jeecg.modules.detail.service.IInterlockDetailService;
|
|
|
+import org.jeecg.modules.history.convert.InterlockDetailHistoryConvert;
|
|
|
+import org.jeecg.modules.history.convert.InterlockSummaryHistoryConvert;
|
|
|
+import org.jeecg.modules.history.entity.InterlockDetailHistory;
|
|
|
+import org.jeecg.modules.history.entity.InterlockSummaryHistory;
|
|
|
+import org.jeecg.modules.history.service.IInterlockDetailHistoryService;
|
|
|
+import org.jeecg.modules.history.service.IInterlockSummaryHistoryService;
|
|
|
+import org.jeecg.modules.iotedgeCollectData.entity.IotedgeCollectData;
|
|
|
+import org.jeecg.modules.iotedgeCollectData.service.IIotedgeCollectDataService;
|
|
|
+import org.jeecg.modules.summary.entity.InterlockSummary;
|
|
|
+import org.jeecg.modules.summary.service.IInterlockSummaryService;
|
|
|
+import org.jeecg.modules.tag.entity.InterlockTag;
|
|
|
+import org.jeecg.modules.tag.service.IInterlockTagService;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.boot.ApplicationArguments;
|
|
|
+import org.springframework.boot.ApplicationRunner;
|
|
|
+import org.springframework.core.annotation.Order;
|
|
|
+import org.springframework.data.redis.core.BoundHashOperations;
|
|
|
+import org.springframework.data.redis.core.HashOperations;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.io.IOException;
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.text.DecimalFormat;
|
|
|
+import java.time.Duration;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author dzc
|
|
|
+ * @date 2024/5/30 9:57
|
|
|
+ * @package org.jeecg.modules.binlog
|
|
|
+ * @project interlock_server
|
|
|
+ * @des binlog监控 需要在 my.ini文件中进行配置
|
|
|
+ */
|
|
|
+
|
|
|
+//此类可以监控MySQL库数据的增删改
|
|
|
+@Order
|
|
|
+@Component
|
|
|
+@Slf4j //用于打印日志
|
|
|
+//在SpringBoot中,提供了一个接口:ApplicationRunner。
|
|
|
+//该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。
|
|
|
+public class MysqlBinLogClient implements ApplicationRunner {
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ @Value("${binarylog.host}")
|
|
|
+ private String host;
|
|
|
+ @Value("${binarylog.port}")
|
|
|
+ private Integer port;
|
|
|
+ @Value("${binarylog.schema}")
|
|
|
+ private String schema;
|
|
|
+ @Value("${binarylog.username}")
|
|
|
+ private String username;
|
|
|
+ @Value("${binarylog.password}")
|
|
|
+ private String password;
|
|
|
+
|
|
|
+ private Long interlockdetailid = 1L; // 联锁详细信息
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ private IInterlockDetailService detailService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ private IInterlockSummaryService summaryService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ private IIotedgeCollectDataService iotedgeCollectDataService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ private IInterlockTagService tagService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ private IInterlockSummaryHistoryService summaryHistoryService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @SuppressWarnings("all")
|
|
|
+ private IInterlockDetailHistoryService detailHistoryService;
|
|
|
+
|
|
|
+ @Async
|
|
|
+ @Override
|
|
|
+ @Order
|
|
|
+ public void run(ApplicationArguments args) throws Exception {
|
|
|
+ //项目启动完成连接bin-log
|
|
|
+ new Thread(() -> {
|
|
|
+ connectMysqlBinLog();
|
|
|
+ }).start();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接mysqlBinLog
|
|
|
+ */
|
|
|
+ public void connectMysqlBinLog() {
|
|
|
+ log.info("监控BinLog服务已启动");
|
|
|
+
|
|
|
+ //自己MySQL的信息。host,port,username,password
|
|
|
+ BinaryLogClient client = new BinaryLogClient(host, port, schema, username, password);
|
|
|
+ // 设置连接时间为20秒
|
|
|
+ client.setConnectTimeout(20 * 1000);
|
|
|
+
|
|
|
+ /**因为binlog不是以数据库为单位划分的,所以监控binglog不是监控的单个的数据库,而是整个 当前所设置连接的MySQL,
|
|
|
+ *其中任何一个库发生数据增删改,这里都能检测到,
|
|
|
+ *所以不用设置所监控的数据库的名字(我也不知道怎么设置,没发现有包含这个形参的构造函数)
|
|
|
+ *如果需要只监控指定的数据库,可以看后面代码,可以获取到当前发生变更的数据库名称。可以根据名称来决定是否监控
|
|
|
+ **/
|
|
|
+
|
|
|
+ client.setServerId(10); //和自己之前设置的server-id保持一致,但是我不知道为什么不一致也能成功
|
|
|
+ //下面直接照抄就行
|
|
|
+ client.registerEventListener(event -> {
|
|
|
+ EventData data = event.getData();
|
|
|
+ if (data instanceof TableMapEventData) {
|
|
|
+ //只要连接的MySQL发生的增删改的操作,则都会进入这里,无论哪个数据库
|
|
|
+ TableMapEventData tableMapEventData = (TableMapEventData) data;
|
|
|
+ if (schema.equals(tableMapEventData.getDatabase())) {
|
|
|
+ if ("interlock_detail".equals(tableMapEventData.getTable())) {
|
|
|
+ interlockdetailid = tableMapEventData.getTableId();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (data instanceof UpdateRowsEventData) {
|
|
|
+ long tableId = ((UpdateRowsEventData) data).getTableId();
|
|
|
+ if (interlockdetailid == tableId){
|
|
|
+ System.out.println("表中数据发生了更改"+data.toString());
|
|
|
+ Object[] array = Arrays.stream(((UpdateRowsEventData) data).getRows().get(0).getValue()).toArray();
|
|
|
+
|
|
|
+ String ybStatusTag = (String) array[13]; // 仪表状态点位 值
|
|
|
+ String ybValueTag = (String) array[17]; // 仪表原始模拟量点位 值
|
|
|
+ String plTag = (String) array[30]; // 旁路点位 值
|
|
|
+ String inputStatusTag = (String) array[34]; // 输入卡件状态点位 值
|
|
|
+ String outputStatusTag = (String) array[38]; // 输出卡件状态点位 值
|
|
|
+ String mpStatusTag = (String) array[42]; // MP状态点位 值
|
|
|
+ String summaryId = (String) array[1]; // 联锁总表ID
|
|
|
+ String id = (String) array[0]; // 联锁条件ID
|
|
|
+ String ybIfFs = (String) array[9]; // 仪表状态判断方式
|
|
|
+ String upperLimit = ""; // 上限
|
|
|
+ if (ObjectUtil.isNotNull(array[18])) {
|
|
|
+ upperLimit = (String)array[18];
|
|
|
+ }
|
|
|
+ String lowerLimit = ""; // 下限
|
|
|
+ if (ObjectUtil.isNotNull(array[19])){
|
|
|
+ lowerLimit = (String)array[19];
|
|
|
+ }
|
|
|
+ String ifBypass = (String) array[29]; // 是否旁路 (若 "是" 则 bypass 等于 点位值;若 "否" 则 bypass 等于 输入值)
|
|
|
+
|
|
|
+ // 仪表状态 (明细表)
|
|
|
+ String ybStatus = "0"; // 0:正常 1:故障
|
|
|
+
|
|
|
+ // 直接读取位号
|
|
|
+ if ("0".equals(ybIfFs) && ObjectUtil.isNotNull(ybStatusTag)){
|
|
|
+ // 仪表状态 等于点位值
|
|
|
+ ybStatus = ybStatusTag;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 高低限判断
|
|
|
+ if ("1".equals(ybIfFs)){
|
|
|
+ // 如果 仪表值 大于上限 或者 小于下限 则状态为 异常;两种情况不会同时满足
|
|
|
+ if (Integer.parseInt(ybValueTag) > Integer.parseInt(upperLimit) || Integer.parseInt(ybValueTag) < Integer.parseInt(lowerLimit)){
|
|
|
+ ybStatus = "1";
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 突变超限判断 规定时间内 (结束值-初始值)/初始值 * 100 百分比 是否大于 规定的阈值
|
|
|
+ if ("2".equals(ybIfFs)){
|
|
|
+ // 如果 采集的频率 大于 规定的时间
|
|
|
+ // 例如 采集是一分钟采集一次 规定的时间的2s之内
|
|
|
+ // 如果 采集的频率 小于等于 规定的时间
|
|
|
+ // 例如 2S之内 去采集数据表中根据 设备id+模块名称+点位名称 查询2s之前的数据,但是表中的数据量大 时间?
|
|
|
+ String yz = (String) array[20]; // 阈值
|
|
|
+ String time = (String) array[21]; // 规定的时间
|
|
|
+ String dw = (String) array[22]; // 时间单位
|
|
|
+ if ("s".equals(dw)){
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+ String endDate = DateUtils.getDate("yyyy-MM-dd HH:mm:ss"); // 当前时间 规定时间的结束时间
|
|
|
+ LocalDateTime endDateTime = LocalDateTime.parse(endDate, formatter);
|
|
|
+ LocalDateTime beginDateTime = endDateTime.minus(Duration.ofSeconds(2));
|
|
|
+ String beginDate = beginDateTime.format(formatter); // 开始时间 规定时间的开始时间
|
|
|
+ QueryWrapper<InterlockTag> tagQuery = new QueryWrapper<>();
|
|
|
+ tagQuery.eq("interlock_condition_id",id).eq("parameter_type","8");
|
|
|
+ InterlockTag interlockTag = tagService.getOne(tagQuery);
|
|
|
+ IotedgeCollectData iotedgeData = iotedgeCollectDataService.getOneInfo(interlockTag.getDeviceId(),interlockTag.getModuleName(),interlockTag.getTagName(),beginDate);
|
|
|
+ BigDecimal beginValue = BigDecimal.valueOf(Integer.parseInt(iotedgeData.getValue()));
|
|
|
+ BigDecimal num = BigDecimal.valueOf(Integer.parseInt(ybValueTag)).subtract(beginValue);
|
|
|
+ if ((num.divide(beginValue).compareTo(BigDecimal.valueOf(Integer.parseInt(yz)))) > 0){
|
|
|
+ ybStatus = "1";
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // todo 判断 仪表状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
|
|
|
+ String ifYbStatus = "1"; // 判断仪表状态较之前是否发生了变化 0:未发生变化 1:变化
|
|
|
+ String currYbStatus = (String) array[8];
|
|
|
+ if (ObjectUtil.isNotNull(currYbStatus) && ybStatus.equals(currYbStatus)){
|
|
|
+ ifYbStatus = "0";
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 控制系统状态 (明细表)
|
|
|
+ String kzxtStatus = "0"; // 0:正常 1:非正常
|
|
|
+ if (Integer.parseInt(inputStatusTag) == 1 || Integer.parseInt(outputStatusTag) == 1 || Integer.parseInt(mpStatusTag) == 1){
|
|
|
+ kzxtStatus = "1";
|
|
|
+ }
|
|
|
+
|
|
|
+ // todo 判断 控制系统状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
|
|
|
+ String ifKzxtStatus = "1"; // 判断控制系统状态较之前是否发生了变化 0:未发生变化 1:变化
|
|
|
+ String currKzxtStatus = (String) array[23];
|
|
|
+ if (ObjectUtil.isNotNull(currKzxtStatus) && kzxtStatus.equals(currKzxtStatus)){
|
|
|
+ ifKzxtStatus = "0";
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 根据总表id 查询数所有的联锁条件
|
|
|
+ String zdybzt = "0"; // 该联锁下 所有联锁条件的 (总)仪表状态
|
|
|
+ String zdkzxtzt = "0"; // 该联锁下 所有联塑条件的 (总)控制系统状态
|
|
|
+ int ybcount = 0; // 该联锁下 所有联锁条件 仪表状态为 异常(1) 的数量
|
|
|
+ int kzxtcount = 0; // 该联锁下 所有联锁条件 控制系统状态为 异常(1) 的数量
|
|
|
+ int plcount = 0; // 该联锁下 所有联锁条件 旁路为 是(1) 的数量
|
|
|
+ List<InterlockDetail> list = detailService.selectListBySummaryId(summaryId);
|
|
|
+ for (InterlockDetail detail:list) {
|
|
|
+ if ("1".equals(detail.getInstrumentStatus())){
|
|
|
+ ybcount++; // 如果仪表状态有一个是异常 则+1;
|
|
|
+ }
|
|
|
+ if (Integer.parseInt(detail.getInputStatus()) == 1 || Integer.parseInt(detail.getOutputStatus()) == 1 || Integer.parseInt(detail.getMpStatus()) == 1) {
|
|
|
+ kzxtcount++; // 如果控制系统状态有一个是异常 则+1;
|
|
|
+ }
|
|
|
+ if ("1".equals(detail.getIfBypass()) && "1".equals(detail.getBypass())){
|
|
|
+ plcount++; // 如果旁路有一个为 是 则+1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 根据总表id 查询对应的联锁
|
|
|
+ QueryWrapper<InterlockSummary> summaryQuery = new QueryWrapper<>();
|
|
|
+ summaryQuery.eq("id",summaryId);
|
|
|
+ InterlockSummary interLock = summaryService.getOne(summaryQuery);
|
|
|
+
|
|
|
+ // 联锁状态 (总表)
|
|
|
+ String lsStatus = "1"; // 0:未投用 1:投用
|
|
|
+ // 暂时不需要根据if_bypass进行判断,默认否的时候 手动输入值为“无旁路” 如果 if_bypass = 1 则根据点位进行判断
|
|
|
+ if (plcount > 0){
|
|
|
+ lsStatus = "0"; // 如果旁路有一个为 是 则联锁状态为 "未投用"
|
|
|
+ }
|
|
|
+
|
|
|
+ // todo 判断 联锁状态 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
|
|
|
+ String ifLsStatus = "1"; // 判断联锁状态较之前是否发生了变化 0:未发生变化 1:变化
|
|
|
+ if (ObjectUtil.isNotNull(interLock.getInterlockStatus()) && lsStatus.equals(interLock.getInterlockStatus())){
|
|
|
+ ifLsStatus = "0";
|
|
|
+ }
|
|
|
+
|
|
|
+ // 回路健康级别 (总表)
|
|
|
+ String hljkjbStatus = "A"; // 四级:A、B、C、D
|
|
|
+ if (ybcount > 0){
|
|
|
+ zdybzt = "1"; // 如果 该联锁下所有联锁条件的 仪表状态为异常的数量大于0 则 总的仪表状态为 异常
|
|
|
+ }
|
|
|
+ if (kzxtcount > 0){
|
|
|
+ zdkzxtzt = "1"; // 如果 该联锁下所有联锁条件的 控制系统为异常的数量大于0 则 总的控制系统状态为 异常
|
|
|
+ }
|
|
|
+ ArrayList<String> sList = new ArrayList<>();
|
|
|
+ sList.add(zdybzt); // 仪表状态
|
|
|
+ if("1".equals(lsStatus)){
|
|
|
+ sList.add("0"); // 暂时认为 联锁状态 1投用 为 正常
|
|
|
+ }else {
|
|
|
+ sList.add("1");
|
|
|
+ }
|
|
|
+ sList.add(zdkzxtzt); // 控制系统状态
|
|
|
+ // 集合中 元素为 1 的数量
|
|
|
+ long count = sList.stream().filter(e -> e.equals("1")).count();
|
|
|
+ if (count == 0){
|
|
|
+ hljkjbStatus = "A"; // 三个状态都正常 则回路健康级别 为 A
|
|
|
+ }else if (count == 1){
|
|
|
+ hljkjbStatus = "B"; // 有任何一个状态为异常 则回路健康级别为 B
|
|
|
+ }else if (count == 2){
|
|
|
+ hljkjbStatus = "C"; // 有任何两个状态为异常 则回路健康级别为 C
|
|
|
+ }else {
|
|
|
+ hljkjbStatus = "D"; // 三个状态都是异常 则回路健康级别为 D
|
|
|
+ }
|
|
|
+
|
|
|
+ // todo 判断 回路健康级别 是否发生了改变;如果发生了变化就将变化之前的数据存入到历史数据表中
|
|
|
+ String ifHljkjbStatus = "1"; // 判断回路健康级别较之前是否发生了变化 0:未发生变化 1:变化
|
|
|
+ if (ObjectUtil.isNotNull(interLock.getLoopHealthLevel()) && hljkjbStatus.equals(interLock.getLoopHealthLevel())){
|
|
|
+ ifHljkjbStatus = "0";
|
|
|
+ }
|
|
|
+
|
|
|
+ // 只要有任意一个状态发生了变化 就将当前联锁、联锁条件记录到历史数据中 (可此时记录的状态确实是未改变之前的,但是点位值却是已经改变之后的)
|
|
|
+ //if ("1".equals(ifYbStatus) || "1".equals(ifLsStatus) || "1".equals(ifKzxtStatus) || "1".equals(ifHljkjbStatus)){
|
|
|
+ // // 需要判断一下 该时间(点位时间)是否已经有了历史记录了 (有可能 多个联锁条件的相应的状态都发生了变化,只存一次即可)
|
|
|
+ // QueryWrapper<InterlockSummaryHistory> summaryHistoryQuery = new QueryWrapper<>();
|
|
|
+ // summaryHistoryQuery.eq("tag_time",array[52]);
|
|
|
+ // long count1 = summaryHistoryService.count(summaryHistoryQuery);
|
|
|
+ // if (count1 == 0){
|
|
|
+ // InterlockSummaryHistory interlockSummaryHistory = InterlockSummaryHistoryConvert.INSTANCE.toHistory(interLock);
|
|
|
+ // summaryHistoryService.save(interlockSummaryHistory);
|
|
|
+ // ArrayList<InterlockDetailHistory> historyList = new ArrayList<>();
|
|
|
+ // for (InterlockDetail item:list) {
|
|
|
+ // item.setSummaryid(interlockSummaryHistory.getId());
|
|
|
+ // historyList.add(InterlockDetailHistoryConvert.INSTANCE.toHistory(item));
|
|
|
+ // }
|
|
|
+ // detailHistoryService.saveBatch(historyList);
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+
|
|
|
+
|
|
|
+ // 修改明细表中 该条联锁条件的 仪表状态、控制系统状态
|
|
|
+ UpdateWrapper<InterlockDetail> upDetailQuery1 = new UpdateWrapper<>();
|
|
|
+ upDetailQuery1.set("instrument_status",ybStatus).set("control_system_status",kzxtStatus);
|
|
|
+ upDetailQuery1.eq("id",array[0]);
|
|
|
+ detailService.update(upDetailQuery1);
|
|
|
+
|
|
|
+
|
|
|
+ // 修改记录上一次值的表中每个点位值+状态
|
|
|
+ //UpdateWrapper<InterlockDetail> upDetailQuery2 = new UpdateWrapper<>();
|
|
|
+ //upDetailQuery2.set("control_system_status",kzxtStatus);
|
|
|
+ //upDetailQuery2.eq("id",array[0]);
|
|
|
+ //detailService.update(upDetailQuery2);
|
|
|
+
|
|
|
+ // 修改总表中的联锁状态、回路健康级别
|
|
|
+ UpdateWrapper<InterlockSummary> upSummaryQuery1 = new UpdateWrapper<>();
|
|
|
+ upSummaryQuery1.set("interlock_status",lsStatus).set("loop_health_level",hljkjbStatus);
|
|
|
+ upSummaryQuery1.eq("id",summaryId);
|
|
|
+ summaryService.update(upSummaryQuery1);
|
|
|
+
|
|
|
+ // 修改记录上一次值的表中每个点位值+状态
|
|
|
+ //UpdateWrapper<InterlockSummary> upSummaryQuery2 = new UpdateWrapper<>();
|
|
|
+ //upSummaryQuery2.set("loop_health_level",hljkjbStatus);
|
|
|
+ //upSummaryQuery2.eq("id",summaryId);
|
|
|
+ //summaryService.update(upSummaryQuery2);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ try {
|
|
|
+ client.connect();
|
|
|
+ } catch (
|
|
|
+ IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|