|
@@ -124,7 +124,7 @@ public class MysqlBinLogClient implements ApplicationRunner {
|
|
|
//只要连接的MySQL发生的增删改的操作,则都会进入这里,无论哪个数据库
|
|
|
TableMapEventData tableMapEventData = (TableMapEventData) data;
|
|
|
if (schema.equals(tableMapEventData.getDatabase())) {
|
|
|
- log.error(tableMapEventData.getTableId() + "", tableMapEventData.getTable());
|
|
|
+// log.error("不是错误,只是想查询binlog对应的表ID:" + tableMapEventData.getTableId() + ",表名:" + tableMapEventData.getTable());
|
|
|
if ("guan_ralarm".equals(tableMapEventData.getTable())) {
|
|
|
guanralarmid = tableMapEventData.getTableId();
|
|
|
} else if ("bwanalogtable".equals(tableMapEventData.getTable())) {
|
|
@@ -210,49 +210,7 @@ public class MysqlBinLogClient implements ApplicationRunner {
|
|
|
|
|
|
// 如果设备状态的LogValue=1,代表设备开始运行,则更新曲线信息
|
|
|
if (anachglog.getLogvalue() == 1) {
|
|
|
- // 2.2 更新预制曲线
|
|
|
- List<String> duans = new ArrayList<>();
|
|
|
- List<CirculateDTO> yzqxList = webAccessService.getYzcx(duans);
|
|
|
-
|
|
|
- // 缓存中,存在则删除,并重新设置
|
|
|
- if (redisUtil.hasKey(GuanCommonConstant.GUAN_PREPARE_CURVE)) {
|
|
|
- redisUtil.del(GuanCommonConstant.GUAN_PREPARE_CURVE);
|
|
|
- }
|
|
|
- redisUtil.set(GuanCommonConstant.GUAN_PREPARE_CURVE, yzqxList);
|
|
|
-
|
|
|
- if (redisUtil.hasKey(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION)) {
|
|
|
- redisUtil.del(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION);
|
|
|
- }
|
|
|
- redisUtil.set(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION, duans);
|
|
|
-
|
|
|
- // 2.3 更新全程曲线
|
|
|
- List<CurveDTO> wholeCrveList = webAccessService.getWholeCurve(null);
|
|
|
-
|
|
|
- // 缓存中,存在则删除,并重新设置
|
|
|
- if (redisUtil.hasKey(GuanCommonConstant.GUAN_WHOLE_CURE)) {
|
|
|
- redisUtil.del(GuanCommonConstant.GUAN_WHOLE_CURE);
|
|
|
- }
|
|
|
- redisUtil.set(GuanCommonConstant.GUAN_WHOLE_CURE, wholeCrveList);
|
|
|
-
|
|
|
- // 2.4 更新实时曲线
|
|
|
- List<CurveDTO> realtimeCrveList = webAccessService.getRealtimeCurve(null);
|
|
|
-
|
|
|
- // 缓存中,存在则删除,并重新设置
|
|
|
- if (redisUtil.hasKey(GuanCommonConstant.GUAN_REALTIME_CURVE)) {
|
|
|
- redisUtil.del(GuanCommonConstant.GUAN_REALTIME_CURVE);
|
|
|
- }
|
|
|
- redisUtil.set(GuanCommonConstant.GUAN_REALTIME_CURVE, realtimeCrveList);
|
|
|
-
|
|
|
- // 2.5 websocket推送
|
|
|
- JSONObject jsonObject = new JSONObject();
|
|
|
-
|
|
|
- jsonObject.put("全程曲线", wholeCrveList);
|
|
|
- jsonObject.put("实时曲线", realtimeCrveList);
|
|
|
- // 预制曲线
|
|
|
- jsonObject.put("预制曲线", yzqxList);
|
|
|
- jsonObject.put("预制曲线线段", duans);
|
|
|
-
|
|
|
- webSocket.pushMessage(jsonObject.toString());
|
|
|
+ refreshCurve();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -263,7 +221,7 @@ public class MysqlBinLogClient implements ApplicationRunner {
|
|
|
if (bwanalogtableid == currTableId) {
|
|
|
// 打包时修改该配置 屏蔽下面的语句
|
|
|
// log.info("bwanalogtable Insert");
|
|
|
- log.info("bwanalogtable Insert:" + data.toString());
|
|
|
+// log.info("bwanalogtable Insert:" + data.toString());
|
|
|
|
|
|
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
|
|
|
for (Serializable[] row : writeRowsEventData.getRows()) {
|
|
@@ -274,12 +232,17 @@ public class MysqlBinLogClient implements ApplicationRunner {
|
|
|
bwanalogtable.setTagname(obj[1].toString());
|
|
|
bwanalogtable.setLogdate(obj[2].toString());
|
|
|
bwanalogtable.setLogtime(obj[3].toString());
|
|
|
-// bwanalogtable.setMaxvalue(Double.parseDouble(obj[4].toString()));
|
|
|
+ bwanalogtable.setMaxvalue(Double.parseDouble(obj[4].toString()));
|
|
|
bwanalogtable.setAvgvalue(Double.parseDouble(obj[5].toString()));
|
|
|
-// bwanalogtable.setMinvalue(Double.parseDouble(obj[6].toString()));
|
|
|
-// bwanalogtable.setLastvalue(Double.parseDouble(obj[7].toString()));
|
|
|
+ bwanalogtable.setMinvalue(Double.parseDouble(obj[6].toString()));
|
|
|
+ bwanalogtable.setLastvalue(Double.parseDouble(obj[7].toString()));
|
|
|
// bwanalogtable.setAlarm(Integer.parseInt(obj[8].toString()));
|
|
|
|
|
|
+ // 检查设备状态
|
|
|
+ if ("设备状态".equals(bwanalogtable.getTagname())) {
|
|
|
+ checkEquipmentStatus(bwanalogtable);
|
|
|
+ }
|
|
|
+
|
|
|
// 实时数据存入redis中
|
|
|
redisUtil.set(GuanCommonConstant.GUAN_TAGVALUE_PREFIX + bwanalogtable.getTagname(), bwanalogtable);
|
|
|
}
|
|
@@ -338,5 +301,119 @@ public class MysqlBinLogClient implements ApplicationRunner {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 刷新所有曲线(设备启动)
|
|
|
+ */
|
|
|
+ private void refreshCurve() {
|
|
|
+ log.info(String.format("设备启动,推送新曲线 ↓↓↓↓↓↓↓↓↓↓:" + DateUtils.getTimestamp()));
|
|
|
+ // 1 更新预制曲线
|
|
|
+ List<String> duans = new ArrayList<>();
|
|
|
+ List<CirculateDTO> yzqxList = webAccessService.getYzqx(duans);
|
|
|
+
|
|
|
+ // 缓存中,存在则删除,并重新设置
|
|
|
+ if (redisUtil.hasKey(GuanCommonConstant.GUAN_PREPARE_CURVE)) {
|
|
|
+ redisUtil.del(GuanCommonConstant.GUAN_PREPARE_CURVE);
|
|
|
+ }
|
|
|
+ redisUtil.set(GuanCommonConstant.GUAN_PREPARE_CURVE, yzqxList);
|
|
|
+
|
|
|
+ if (redisUtil.hasKey(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION)) {
|
|
|
+ redisUtil.del(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION);
|
|
|
+ }
|
|
|
+ redisUtil.set(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION, duans);
|
|
|
+
|
|
|
+ // 2 更新全程曲线
|
|
|
+ List<CurveDTO> wholeCrveList = webAccessService.getWholeCurve(null);
|
|
|
+
|
|
|
+ // 缓存中,存在则删除,并重新设置
|
|
|
+ if (redisUtil.hasKey(GuanCommonConstant.GUAN_WHOLE_CURE)) {
|
|
|
+ redisUtil.del(GuanCommonConstant.GUAN_WHOLE_CURE);
|
|
|
+ }
|
|
|
+ redisUtil.set(GuanCommonConstant.GUAN_WHOLE_CURE, wholeCrveList);
|
|
|
+
|
|
|
+ // 3 更新实时曲线
|
|
|
+ List<CurveDTO> realtimeCrveList = webAccessService.getRealtimeCurve(null);
|
|
|
+
|
|
|
+ // 缓存中,存在则删除,并重新设置
|
|
|
+ if (redisUtil.hasKey(GuanCommonConstant.GUAN_REALTIME_CURVE)) {
|
|
|
+ redisUtil.del(GuanCommonConstant.GUAN_REALTIME_CURVE);
|
|
|
+ }
|
|
|
+ redisUtil.set(GuanCommonConstant.GUAN_REALTIME_CURVE, realtimeCrveList);
|
|
|
+
|
|
|
+ // 4 websocket推送
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+
|
|
|
+ jsonObject.put("全程曲线", wholeCrveList);
|
|
|
+ jsonObject.put("实时曲线", realtimeCrveList);
|
|
|
+ // 预制曲线
|
|
|
+ jsonObject.put("预制曲线", yzqxList);
|
|
|
+ jsonObject.put("预制曲线线段", duans);
|
|
|
+
|
|
|
+ webSocket.pushMessage(jsonObject.toString());
|
|
|
+
|
|
|
+ log.info(String.format("设备启动,推送新曲线 ↑↑↑↑↑↑↑↑↑↑:" + DateUtils.getTimestamp()));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查设备状态(如果存在异常,则刷新预制曲线)
|
|
|
+ * @param bwanalogtable 设备状态
|
|
|
+ */
|
|
|
+ private void checkEquipmentStatus(Bwanalogtable bwanalogtable) {
|
|
|
+ // 1 原来的设备状态在缓存中是空,则刷新预制曲线
|
|
|
+ if (!redisUtil.hasKey(GuanCommonConstant.GUAN_TAGVALUE_PREFIX + "设备状态")) {
|
|
|
+ refreshPrepareCurve();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2 获取之前的设备状态
|
|
|
+ Bwanalogtable oldbw = (Bwanalogtable) redisUtil.get(GuanCommonConstant.GUAN_TAGVALUE_PREFIX + bwanalogtable.getTagname());
|
|
|
+
|
|
|
+ // 3 原来的设备状态是-105(少于0),现在不是,则刷新预制曲线
|
|
|
+ if (oldbw.getAvgvalue() < 0 && bwanalogtable.getAvgvalue() >= 0) {
|
|
|
+ refreshPrepareCurve();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 4 设备状态超过40秒未更新,则刷新预制曲线
|
|
|
+ Date oldlogtime = DateUtils.parseDatetime(("20" + oldbw.getLogdate() + " " + oldbw.getLogtime()).replace("/", "-"));
|
|
|
+ Date newlogtime = DateUtils.parseDatetime(("20" + bwanalogtable.getLogdate() + " " + bwanalogtable.getLogtime()).replace("/", "-"));
|
|
|
+ if ((newlogtime.getTime() - oldlogtime.getTime()) / 1000 > 40) {
|
|
|
+ refreshPrepareCurve();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 刷新预制曲线
|
|
|
+ */
|
|
|
+ private void refreshPrepareCurve() {
|
|
|
+ log.info(String.format("刷新预制曲线 ↓↓↓↓↓↓↓↓↓↓:" + DateUtils.getTimestamp()));
|
|
|
+ // 1 更新预制曲线
|
|
|
+ List<String> duans = new ArrayList<>();
|
|
|
+ List<CirculateDTO> yzqxList = webAccessService.getYzqx(duans);
|
|
|
+
|
|
|
+ // 缓存中,存在则删除,并重新设置
|
|
|
+ if (redisUtil.hasKey(GuanCommonConstant.GUAN_PREPARE_CURVE)) {
|
|
|
+ redisUtil.del(GuanCommonConstant.GUAN_PREPARE_CURVE);
|
|
|
+ }
|
|
|
+ redisUtil.set(GuanCommonConstant.GUAN_PREPARE_CURVE, yzqxList);
|
|
|
+
|
|
|
+ if (redisUtil.hasKey(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION)) {
|
|
|
+ redisUtil.del(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION);
|
|
|
+ }
|
|
|
+ redisUtil.set(GuanCommonConstant.GUAN_PREPARE_CURVE_SECTION, duans);
|
|
|
+
|
|
|
+ // 2 websocket推送
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+
|
|
|
+ // 预制曲线
|
|
|
+ jsonObject.put("预制曲线", yzqxList);
|
|
|
+ jsonObject.put("预制曲线线段", duans);
|
|
|
+
|
|
|
+ webSocket.pushMessage(jsonObject.toString());
|
|
|
+
|
|
|
+ log.info(String.format("刷新预制曲线 ↑↑↑↑↑↑↑↑↑↑:" + DateUtils.getTimestamp()));
|
|
|
+ }
|
|
|
}
|
|
|
|