Przeglądaj źródła

科研任务解析表多线程导入

lw 1 rok temu
rodzic
commit
3e566175b8

+ 1 - 1
jeecg-module-system/jeecg-system-start/src/main/resources/application-dev.yml

@@ -139,7 +139,7 @@ spring:
 #          password: root
 #          driver-class-name: com.mysql.cj.jdbc.Driver
         slave:  # 第二个数据源
-          url: jdbc:mysql://119.3.168.55:3306/look?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
+          url: jdbc:mysql://119.3.168.55:3306/look?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
           username: root
           password: 302201
           driver-class-name: com.mysql.cj.jdbc.Driver

+ 19 - 15
module_kzks/src/main/java/org/jeecg/modules/kyTaskTemp/entity/KyTaskTemp.java

@@ -1,22 +1,18 @@
 package org.jeecg.modules.kyTaskTemp.entity;
 
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.Date;
-import java.math.BigDecimal;
+
+import com.alibaba.excel.annotation.ExcelIgnore;
+import com.alibaba.excel.annotation.ExcelProperty;
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
-import com.baomidou.mybatisplus.annotation.TableLogic;
 import lombok.Data;
-import com.fasterxml.jackson.annotation.JsonFormat;
-import org.springframework.format.annotation.DateTimeFormat;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
 import org.jeecgframework.poi.excel.annotation.Excel;
-import org.jeecg.common.aspect.annotation.Dict;
-import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.EqualsAndHashCode;
-import lombok.experimental.Accessors;
 
 /**
  * @Description: 任务关系映射表
@@ -25,31 +21,39 @@ import lombok.experimental.Accessors;
  * @Version: V1.0
  */
 @Data
+@Getter
+@Setter
+@NoArgsConstructor
 @TableName("ky_task_temp")
-@Accessors(chain = true)
-@EqualsAndHashCode(callSuper = false)
-@ApiModel(value="ky_task_temp对象", description="任务关系映射表")
+//@Accessors(chain = true)
+//@EqualsAndHashCode(callSuper = false)
+//@ApiModel(value="ky_task_temp对象", description="任务关系映射表")
 public class KyTaskTemp implements Serializable {
     private static final long serialVersionUID = 1L;
 
 	/**id*/
-	@TableId(type = IdType.ASSIGN_ID)
+    @TableId(type = IdType.AUTO)
     @ApiModelProperty(value = "id")
-    private java.lang.String id;
+    @ExcelProperty(value = "id")
+    private java.lang.Long id;
 	/**任务号*/
 	@Excel(name = "任务号", width = 15)
     @ApiModelProperty(value = "任务号")
+    @ExcelProperty(value = "任务号")
     private java.lang.String taskno;
 	/**父级任务号*/
 	@Excel(name = "父级任务号", width = 15)
     @ApiModelProperty(value = "父级任务号")
+    @ExcelProperty(value = "父级任务号")
     private java.lang.String reftaskno;
 	/**批产任务号*/
 	@Excel(name = "批产任务号", width = 15)
     @ApiModelProperty(value = "批产任务号")
+    @ExcelProperty(value = "批产任务号")
     private java.lang.String pccode;
 	/**备产任务号*/
 	@Excel(name = "备产任务号", width = 15)
     @ApiModelProperty(value = "备产任务号")
+    @ExcelProperty(value = "备产任务号")
     private java.lang.String bccode;
 }

+ 8 - 0
module_kzks/src/main/java/org/jeecg/modules/kyTaskTemp/mapper/KyTaskTempMapper.java

@@ -2,7 +2,10 @@ package org.jeecg.modules.kyTaskTemp.mapper;
 
 import java.util.List;
 
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
+import org.jeecg.modules.dataSourceSwitch.annotation.UseSlaveDataSource;
 import org.jeecg.modules.kyTaskTemp.entity.KyTaskTemp;
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
@@ -12,6 +15,11 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
  * @Date:   2023-09-16
  * @Version: V1.0
  */
+@Mapper
 public interface KyTaskTempMapper extends BaseMapper<KyTaskTemp> {
 
+    @Delete("truncate table ky_task_temp")
+    void truncateTable();
+
+    void batchSave(@Param("list") List<KyTaskTemp> list);
 }

+ 8 - 1
module_kzks/src/main/java/org/jeecg/modules/kyTaskTemp/mapper/xml/KyTaskTempMapper.xml

@@ -2,4 +2,11 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.jeecg.modules.kyTaskTemp.mapper.KyTaskTempMapper">
 
-</mapper>
+    <insert id="batchSave" parameterType="java.util.List">
+        insert into ky_task_temp(id, taskno, reftaskno, pcCode, bcCode)
+        VALUES
+        <foreach collection="list" item="KyTaskTemp" index="index"  separator="," >
+           ( #{KyTaskTemp.id}, #{KyTaskTemp.taskno}, #{KyTaskTemp.reftaskno}, #{KyTaskTemp.pccode}, #{KyTaskTemp.bccode})
+        </foreach>
+    </insert>
+</mapper>

+ 8 - 0
module_kzks/src/main/java/org/jeecg/modules/kyTaskTemp/service/IKyTaskTempService.java

@@ -2,7 +2,12 @@ package org.jeecg.modules.kyTaskTemp.service;
 
 import com.baomidou.mybatisplus.extension.service.IService;
 import org.jeecg.common.api.vo.Result;
+import org.jeecg.modules.dataSourceSwitch.annotation.UseSlaveDataSource;
 import org.jeecg.modules.kyTaskTemp.entity.KyTaskTemp;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.List;
 
 /**
  * @Description: 任务关系映射表
@@ -15,4 +20,7 @@ public interface IKyTaskTempService extends IService<KyTaskTemp> {
     /**用于文件管理——导入日志——导入外协价格库*/
     public Result<?> importExcel1(String strUrl, Class<KyTaskTemp> clazz);
 
+    public void truncateTable();
+
+    public void batchSave(List<KyTaskTemp> list);
 }

+ 61 - 44
module_kzks/src/main/java/org/jeecg/modules/kyTaskTemp/service/impl/KyTaskTempServiceImpl.java

@@ -1,74 +1,91 @@
 package org.jeecg.modules.kyTaskTemp.service.impl;
 
+import com.alibaba.excel.EasyExcel;
+import com.alibaba.excel.EasyExcelFactory;
+import com.alibaba.excel.ExcelReader;
+import com.alibaba.excel.cache.MapCache;
+import com.alibaba.excel.read.metadata.ReadSheet;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.jeecg.common.api.vo.Result;
+import org.jeecg.config.mybatis.ThreadLocalDataHelper;
+import org.jeecg.modules.dataSourceSwitch.annotation.UseMasterDataSource;
 import org.jeecg.modules.kyTaskTemp.entity.KyTaskTemp;
 import org.jeecg.modules.kyTaskTemp.mapper.KyTaskTempMapper;
 import org.jeecg.modules.kyTaskTemp.service.IKyTaskTempService;
-import org.jeecgframework.poi.excel.ExcelImportUtil;
-import org.jeecgframework.poi.excel.entity.ImportParams;
+import org.jeecg.modules.projectImportList.readListener.KyTaskTempImportReadListener;
+import org.springframework.aop.framework.AopContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.stereotype.Service;
 
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.*;
 
 /**
  * @Description: 任务关系映射表
  * @Author: jeecg-boot
- * @Date:   2023-09-16
+ * @Date: 2023-09-16
  * @Version: V1.0
  */
 @Service
 @Slf4j
+@UseMasterDataSource
+@EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true)
 public class KyTaskTempServiceImpl extends ServiceImpl<KyTaskTempMapper, KyTaskTemp> implements IKyTaskTempService {
 
+    @Autowired
+    private KyTaskTempMapper tempMapper;
 
-    /**用于文件管理——导入日志——导入外协价格库*/
-    public Result<?> importExcel1(String strUrl, Class<KyTaskTemp> clazz){
-        InputStream inputStream = null;
-        try {
-            inputStream = new FileInputStream(strUrl);
-        } catch (FileNotFoundException e) {
-            e.printStackTrace();
-        }
 
-        // 获取上传文件对象
-        ImportParams params = new ImportParams();
-        params.setTitleRows(0);/*表格标题所占行数(默认是0)*/
-        params.setHeadRows(1);/*表头所占据的行数行数,默认1,代表标题占据一行*/
-        params.setNeedSave(true);
+    /**
+     * 用于文件管理——导入日志——导入外协价格库
+     */
+    public Result<?> importExcel1(String strUrl, Class<KyTaskTemp> clazz) {
+        long startTime = System.currentTimeMillis();
+        File file = new File(strUrl);
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
         try {
-            List<KyTaskTemp> list = ExcelImportUtil.importExcel(inputStream, clazz, params);
-
-            //update-begin-author:taoyan date:20190528 for:批量插入数据
-            long start = System.currentTimeMillis();
-            this.saveBatch(list);
-            //400条 saveBatch消耗时间1592毫秒  循环插入消耗时间1947毫秒
-            //1200条  saveBatch消耗时间3687毫秒 循环插入消耗时间5212毫秒
-            log.info("消耗时间" + (System.currentTimeMillis() - start) + "毫秒");
-            //update-end-author:taoyan date:20190528 for:批量插入数据
-            return Result.OK("文件导入成功!数据行数:" + list.size());
-        } catch (Exception e) {
-            //update-begin-author:taoyan date:20211124 for: 导入数据重复增加提示
-            String msg = e.getMessage();
-            log.error(msg, e);
-            if(msg!=null && msg.indexOf("Duplicate entry")>=0){
-                return Result.error("文件导入失败:有重复数据!");
-            }else{
-                return Result.error("文件导入失败:" + e.getMessage());
+            IKyTaskTempService iKyTaskTempService = (IKyTaskTempService) AopContext.currentProxy();
+            iKyTaskTempService.truncateTable();
+            KyTaskTempImportReadListener readListener = new KyTaskTempImportReadListener(iKyTaskTempService);
+            ExcelReader build = EasyExcel.read(file, clazz, readListener).build();
+            List<ReadSheet> readSheets = build.excelExecutor().sheetList();
+            KyTaskTempImportReadListener.setSheetSize(readSheets.size());
+            ArrayList<Callable<Object>> task = new ArrayList<>();
+            for (ReadSheet readSheet : readSheets) {
+                task.add(() -> {
+                    // 设置初始值
+                    ExcelReader buildTwo = EasyExcel.read(file, clazz, readListener).readCache(new MapCache()).build();
+                    buildTwo.read(EasyExcelFactory.readSheet(readSheet.getSheetNo()).build());
+                    return null;
+                });
             }
-            //update-end-author:taoyan date:20211124 for: 导入数据重复增加提示
-        } finally {
-            try {
-                inputStream.close();
-            } catch (IOException e) {
-                e.printStackTrace();
+            List<Future<Object>> futures = threadPoolExecutor.invokeAll(task);
+            for (Future<Object> future : futures) {
+                future.get();
             }
+            threadPoolExecutor.shutdown();
+            long endTime = System.currentTimeMillis();
+            log.info("导入项目费用花费时间:{}毫秒 | {}分钟", (endTime - startTime), ((endTime - startTime) / 1000 / 60));
+            return Result.OK();
+        } catch (Exception e) {
+            e.printStackTrace();
+            return Result.error("导入失败");
+        } finally {
+            threadPoolExecutor.shutdown();
         }
     }
 
+    @Override
+    public void truncateTable() {
+        tempMapper.truncateTable();
+    }
+
+    @Override
+    public void batchSave(List<KyTaskTemp> list) {
+        tempMapper.batchSave(list);
+    }
 }

+ 101 - 0
module_kzks/src/main/java/org/jeecg/modules/projectImportList/readListener/KyTaskTempImportReadListener.java

@@ -0,0 +1,101 @@
+package org.jeecg.modules.projectImportList.readListener;
+
+import com.alibaba.excel.context.AnalysisContext;
+import com.alibaba.excel.event.AnalysisEventListener;
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.modules.kyTaskTemp.entity.KyTaskTemp;
+import org.jeecg.modules.kyTaskTemp.service.IKyTaskTempService;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class KyTaskTempImportReadListener extends AnalysisEventListener<KyTaskTemp> {
+
+    private final IKyTaskTempService service;
+    private static int sheetSize;
+    private static final AtomicInteger sheetCount;
+    private static ThreadPoolExecutor pool;
+    public final ThreadLocal<List<KyTaskTemp>> threadLocal;
+
+    public static void setSheetSize(int sheetSize) {
+        KyTaskTempImportReadListener.sheetSize = sheetSize;
+    }
+
+    static {
+        sheetCount = new AtomicInteger();
+        pool = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
+    }
+
+    public KyTaskTempImportReadListener(IKyTaskTempService service) {
+        this.service = service;
+        threadLocal = new ThreadLocal<>();
+    }
+
+    @Override
+    public void invoke(KyTaskTemp kyTaskTemp, AnalysisContext analysisContext) {
+        try {
+            List<KyTaskTemp> temps = threadLocal.get();
+            if (temps == null ) temps = new ArrayList<>();
+            if (!temps.isEmpty() && temps.size() % 1000 == 0) {
+                ArrayList<KyTaskTemp> kyTaskTemps = new ArrayList<>(temps);
+                temps = new ArrayList<>(1000);
+                if (pool.isShutdown())
+                    pool = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
+                pool.submit(() -> {
+                    try {
+                        service.batchSave(kyTaskTemps);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                });
+            }
+            temps.add(kyTaskTemp);
+            threadLocal.set(temps);
+        } catch (Exception e) {
+            clear();
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void doAfterAllAnalysed(AnalysisContext analysisContext) {
+        try {
+            List<KyTaskTemp> temps = threadLocal.get();
+            if (temps.isEmpty()) return;
+            pool.submit(() -> {
+                try {
+                    service.batchSave(temps);
+                    log.info("{}导入完毕", analysisContext.readSheetHolder().getSheetName());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            });
+            int count = sheetCount.incrementAndGet();
+            if (count == sheetSize) {
+                pool.shutdown();
+                boolean flag = pool.awaitTermination(3, TimeUnit.HOURS);
+                clear();
+                log.info("所有sheet导入完毕");
+            }
+        } catch (Exception e) {
+            clear();
+            e.printStackTrace();
+        }
+    }
+
+    public void clear() {
+        try {
+            pool.shutdown();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        threadLocal.remove();
+        sheetSize = 0;
+        sheetCount.set(0);
+    }
+}

+ 2 - 3
module_kzks/src/main/java/org/jeecg/modules/projectImportList/readListener/ProjectChbImportListener.java

@@ -28,7 +28,6 @@ import org.jeecg.modules.projectImportList.service.impl.ProjectImportListService
 import org.jeecg.modules.projectKmbh.entity.KzksProjectKmbh;
 import org.jeecg.modules.projectKmbh.service.IKzksProjectKmbhService;
 
-import javax.transaction.Transactional;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -36,7 +35,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @Slf4j
-public class ProjectChbImportListener extends AnalysisEventListener<ProjectChb> {
+public class ProjectChbImportReadListener extends AnalysisEventListener<ProjectChb> {
     private static final List<ProjectChbSwf> swfList = new CopyOnWriteArrayList<>();//事务费
     private static final List<ProjectChbZyf> zyfList = new CopyOnWriteArrayList<>();//专用费
     private static final List<ProjectChbWxf> wxfList = new CopyOnWriteArrayList<>();//外协费
@@ -70,7 +69,7 @@ public class ProjectChbImportListener extends AnalysisEventListener<ProjectChb>
 
     private final ThreadPoolExecutor threadPoolExecutor;
 
-    public ProjectChbImportListener(IKzksProjectKmbhService kmbhService, IProjectChbSwfService swfService, IProjectChbZyfService zyfService, IKzksProjectChbRdfService rdfService, IKzksProjectChbZjfService zjfService, IKzksProjectChbGlfService glfService, IKzksProjectChbRgfService rgfService, IProjectChbSxfService sxfService, IProjectChbWxfService wxfService, RedisUtil redisUtil, ThreadPoolExecutor threadPoolExecutor) {
+    public ProjectChbImportReadListener(IKzksProjectKmbhService kmbhService, IProjectChbSwfService swfService, IProjectChbZyfService zyfService, IKzksProjectChbRdfService rdfService, IKzksProjectChbZjfService zjfService, IKzksProjectChbGlfService glfService, IKzksProjectChbRgfService rgfService, IProjectChbSxfService sxfService, IProjectChbWxfService wxfService, RedisUtil redisUtil, ThreadPoolExecutor threadPoolExecutor) {
         this.kmbhService = kmbhService;
         this.swfService = swfService;
         this.zyfService = zyfService;

+ 18 - 16
module_kzks/src/main/java/org/jeecg/modules/projectImportList/service/impl/ProjectImportListServiceImpl.java

@@ -20,7 +20,7 @@ import org.jeecg.modules.projectChbZyf.service.IProjectChbZyfService;
 import org.jeecg.modules.projectImportList.dto.ProjectChb;
 import org.jeecg.modules.projectImportList.entity.ProjectImportList;
 import org.jeecg.modules.projectImportList.mapper.ProjectImportListMapper;
-import org.jeecg.modules.projectImportList.readListener.ProjectChbImportListener;
+import org.jeecg.modules.projectImportList.readListener.ProjectChbImportReadListener;
 import org.jeecg.modules.projectImportList.service.IProjectImportListService;
 import org.jeecg.modules.projectKmbh.service.IKzksProjectKmbhService;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -79,22 +79,22 @@ public class ProjectImportListServiceImpl extends ServiceImpl<ProjectImportListM
         File file = new File(strUrl);
         ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
         ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
-        ProjectChbImportListener projectChbImportListener =
-                new ProjectChbImportListener(kmbhService, swfService, zyfService, rdfService, zjfService, glfService, rgfService, sxfService, wxfService, redisUtil, poolExecutor);
-        projectChbImportListener.initRedisCache();
-        ExcelReader build = EasyExcel.read(file, clazz, projectChbImportListener).build();
-        List<ReadSheet> readSheets = build.excelExecutor().sheetList();
-
-        ArrayList<Callable<Object>> task = new ArrayList<>();
-        for (ReadSheet readSheet : readSheets) {
-            task.add(() -> {
-                ExcelReader buildTwo = EasyExcel.read(file, clazz, projectChbImportListener).readCache(new MapCache()).build();
-                buildTwo.read(EasyExcelFactory.readSheet(readSheet.getSheetNo()).build());
-                return null;
-            });
-        }
-        taskCount = readSheets.size();
         try {
+            ProjectChbImportReadListener projectChbImportReadListener =
+                    new ProjectChbImportReadListener(kmbhService, swfService, zyfService, rdfService, zjfService, glfService, rgfService, sxfService, wxfService, redisUtil, poolExecutor);
+            projectChbImportReadListener.initRedisCache();
+            ExcelReader build = EasyExcel.read(file, clazz, projectChbImportReadListener).build();
+            List<ReadSheet> readSheets = build.excelExecutor().sheetList();
+
+            ArrayList<Callable<Object>> task = new ArrayList<>();
+            for (ReadSheet readSheet : readSheets) {
+                task.add(() -> {
+                    ExcelReader buildTwo = EasyExcel.read(file, clazz, projectChbImportReadListener).readCache(new MapCache()).build();
+                    buildTwo.read(EasyExcelFactory.readSheet(readSheet.getSheetNo()).build());
+                    return null;
+                });
+            }
+            taskCount = readSheets.size();
             List<Future<Object>> futures = threadPoolExecutor.invokeAll(task);
             for (Future<Object> future : futures) {
                 future.get();
@@ -102,6 +102,8 @@ public class ProjectImportListServiceImpl extends ServiceImpl<ProjectImportListM
             threadPoolExecutor.shutdown();
         } finally {
             taskCount = 0;
+            threadPoolExecutor.shutdown();
+            poolExecutor.shutdown();
         }
         long endTime = System.currentTimeMillis();
         log.info("导入项目费用花费时间:{}毫秒 | {}分钟", (endTime - startTime), ((endTime - startTime) / 1000 / 60));