Przeglądaj źródła

物资导入(该单线程只能导单表,加了进度条)

sl 1 rok temu
rodzic
commit
8a6ad1261f

+ 3 - 3
jeecg-module-system/jeecg-system-start/src/main/resources/application-prod.yml

@@ -130,18 +130,18 @@ spring:
         connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
       datasource:
         master:  ## 声明第一个数据源所需的数据
-          url: jdbc:mysql://127.0.0.1:3306/kezhikeshi?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
+          url: jdbc:mysql://127.0.0.1:3306/kezhikeshi?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
           username: root
           password: Xinxi513513
           driver-class-name: com.mysql.cj.jdbc.Driver
 #          password: Xinxi513513
         slave:  # 第二个数据源
-          url: jdbc:mysql://127.0.0.1:3306/look?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
+          url: jdbc:mysql://127.0.0.1:3306/look?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
           username: root
           password: Xinxi513513
           driver-class-name: com.mysql.cj.jdbc.Driver
         datax: # 第三个数据源
-          url: jdbc:mysql://127.0.0.1:3306/datax_web?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
+          url: jdbc:mysql://127.0.0.1:3306/datax_web?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
           username: root
           password: Xinxi513513
           driver-class-name: com.mysql.cj.jdbc.Driver

+ 6 - 0
module_kzks/pom.xml

@@ -92,6 +92,12 @@
             <version>3.2.4</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+
+
     </dependencies>
 
 </project>

+ 157 - 2
module_kzks/src/main/java/org/jeecg/modules/wzOutboundOrderBNew/monitor/WzOutboundOrderBNewListerner.java

@@ -2,15 +2,29 @@ package org.jeecg.modules.wzOutboundOrderBNew.monitor;
 
 import com.alibaba.excel.context.AnalysisContext;
 import com.alibaba.excel.event.AnalysisEventListener;
+import com.alibaba.fastjson.JSONObject;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.poi.ss.usermodel.Workbook;
+import org.apache.poi.ss.usermodel.Sheet;
+import org.apache.poi.ss.usermodel.Row;
+import org.apache.shiro.SecurityUtils;
 import org.checkerframework.checker.index.qual.SameLen;
+import org.jeecg.common.system.vo.LoginUser;
+import org.jeecg.modules.message.websocket.WebSocket;
+import org.jeecg.modules.system.service.ISysUserService;
 import org.jeecg.modules.wzOutboundOrderBNew.entity.WzOutboundOrderBNew;
 import org.jeecg.modules.wzOutboundOrderBNew.mapper.WzOutboundOrderBNewMapper;
 import org.jeecg.modules.wzOutboundOrderBNew.service.IWzOutboundOrderBNewService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
+import javax.annotation.Resource;
 import java.nio.file.LinkOption;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * ClassName: WzOutboundOrderBNewListerner
@@ -24,15 +38,21 @@ import java.util.List;
 @Slf4j
 public class WzOutboundOrderBNewListerner extends AnalysisEventListener<WzOutboundOrderBNew> {
     private IWzOutboundOrderBNewService wzOutboundOrderBNewService;
+
     //每隔1000条存储数据库,实际使用中可以3000条,然后清理list,方便内存回收
     private static final int BATCH_COUNT = 3000;
     List<WzOutboundOrderBNew> list = new ArrayList<>();
+    private WebSocket webSocket;
+
+    Integer currentRow = 0;
+    Integer totalRows = 0;
 
     public WzOutboundOrderBNewListerner() {
     }
 
-    public WzOutboundOrderBNewListerner(IWzOutboundOrderBNewService wzOutboundOrderBNewService) {
+    public WzOutboundOrderBNewListerner(IWzOutboundOrderBNewService wzOutboundOrderBNewService, WebSocket webSocket) {
         this.wzOutboundOrderBNewService = wzOutboundOrderBNewService;
+        this.webSocket = webSocket;
     }
 
     @Override
@@ -52,7 +72,9 @@ public class WzOutboundOrderBNewListerner extends AnalysisEventListener<WzOutbou
     public void doAfterAllAnalysed(AnalysisContext analysisContext){
         //这里也要保存数据,确保最后遗留的数据也存储到数据库
         saveData();
-        log.info("所有数据解析完成!");
+        log.info("当前表所有数据解析完成!");
+
+
     }
 
     //存储数据库
@@ -60,6 +82,139 @@ public class WzOutboundOrderBNewListerner extends AnalysisEventListener<WzOutbou
         log.info("{}条数据,开始存储数据库!", list.size());
         wzOutboundOrderBNewService.saveBatch(list);
         log.info("数据存储数据库成功!");
+        if((currentRow+1)==totalRows){
+            sendProgressToFrontEnd(100);
+        }
+    }
+
+    @Override
+    public boolean hasNext(AnalysisContext analysisContext) {
+        currentRow = analysisContext.getCurrentRowNum();
+        totalRows = analysisContext.getTotalCount();
+
+        int progress = 0;
+        // 根据当前行数和总行数计算进度
+        if(currentRow>0){
+            progress = (currentRow * 100) / totalRows;
+        }
+
+        // 将进度信息发送到前端
+        sendProgressToFrontEnd(progress);
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        return true;
+    }
+
+    private void sendProgressToFrontEnd(int progress) {
+        // 这里实现将进度数据发送到前端的逻辑
+        // 例如,可以通过WebSocket、HTTP Long Polling或其他方式发送
+        System.out.println("进度:" + progress + "%");
+
+        LoginUser sysUser = (LoginUser) SecurityUtils.getSubject().getPrincipal();
+        String userId = sysUser.getId();
+        System.out.println(userId);
+
+        //创建业务消息信息
+        JSONObject obj = new JSONObject();
+        obj.put("cmd", "user");//业务类型
+        obj.put("msgId", userId);//消息id
+        obj.put("msgTxt", progress);//消息内容
+        System.out.println(obj.toJSONString());
+        //单个用户发送 (userId为用户id)
+        webSocket.sendMessage(userId, obj.toJSONString());
+
     }
 
 }
+
+
+
+
+//public class WzOutboundOrderBNewListerner extends AnalysisEventListener<WzOutboundOrderBNew> {
+//    private IWzOutboundOrderBNewService wzOutboundOrderBNewService;
+//
+//    private ExecutorService executorService = Executors.newFixedThreadPool(5);
+//
+//    private ThreadLocal<ArrayList<WzOutboundOrderBNew>> wzOutboundOrderBNewList = ThreadLocal.withInitial(ArrayList::new);
+//
+//    //每隔1000条存储数据库,实际使用中可以3000条,然后清理list,方便内存回收
+//    private static final int batchSize = 1000;
+//
+//    private static AtomicInteger count = new AtomicInteger(1);
+////    List<WzOutboundOrderBNew> list = new ArrayList<>();
+//
+//    public WzOutboundOrderBNewListerner() {
+//    }
+//
+//    public WzOutboundOrderBNewListerner(IWzOutboundOrderBNewService wzOutboundOrderBNewService) {
+//        this.wzOutboundOrderBNewService = wzOutboundOrderBNewService;
+//    }
+//
+//    @Override
+//    public void invoke(WzOutboundOrderBNew wzOutboundOrderBNew, AnalysisContext analysisContext){
+////        System.out.println(wzOutboundOrderBNew);
+//        wzOutboundOrderBNewList.get().add(wzOutboundOrderBNew);
+////        System.out.println(list);
+//        //达到BATCH_COUNT了,需要去存储一次数据库,防止数据几万条数据在内存,容易OOM
+//        if(wzOutboundOrderBNewList.get().size() >= batchSize){
+////            saveData();
+//            asyncSaveData();
+//            //存储完成清理list
+////            list.clear();
+//        }
+//    }
+//
+//    @Override
+//    public void doAfterAllAnalysed(AnalysisContext analysisContext){
+//        log.info("一个Sheet全部处理完");
+//        if (wzOutboundOrderBNewList.get().size() >= batchSize) {
+//            saveData();
+//        }
+//    }
+//
+//    //存储数据库
+//    public void saveOne(WzOutboundOrderBNew data){
+//        wzOutboundOrderBNewService.save(data);
+//        log.info("第" + count.getAndAdd(1) + "次插入1条数据");
+//    }
+//    private void saveData(){
+//        if (!wzOutboundOrderBNewList.get().isEmpty()) {
+//            log.info("{}条数据,开始存储数据库!", wzOutboundOrderBNewList.get().size());
+//            wzOutboundOrderBNewService.saveBatch(wzOutboundOrderBNewList.get());
+//            log.info("第" + count.getAndAdd(1) + "次插入" + wzOutboundOrderBNewList.get().size() + "条数据");
+//            wzOutboundOrderBNewList.get().clear();
+//        }
+//    }
+//
+//    public void asyncSaveData() {
+//        if (!wzOutboundOrderBNewList.get().isEmpty()) {
+//            ArrayList<WzOutboundOrderBNew> wzOutboundOrderBNews = (ArrayList<WzOutboundOrderBNew>) wzOutboundOrderBNewList.get().clone();
+//            executorService.execute(new SaveTask(wzOutboundOrderBNews, wzOutboundOrderBNewService));
+//            wzOutboundOrderBNewList.get().clear();
+//        }
+//    }
+//
+//    static class SaveTask implements Runnable {
+//        private List<WzOutboundOrderBNew> wzOutboundOrderBNewList;
+//        private IWzOutboundOrderBNewService wzOutboundOrderBNewService;
+//
+//        public SaveTask(List<WzOutboundOrderBNew> wzOutboundOrderBNewList, IWzOutboundOrderBNewService wzOutboundOrderBNewService) {
+//            this.wzOutboundOrderBNewList = wzOutboundOrderBNewList;
+//            this.wzOutboundOrderBNewService = wzOutboundOrderBNewService;
+//        }
+//
+//        @Override
+//        public void run() {
+//            wzOutboundOrderBNewService.saveBatch(wzOutboundOrderBNewList);
+//            log.info("第" + count.getAndAdd(1) + "次插入" + wzOutboundOrderBNewList.size() + "条数据");
+//        }
+//    }
+//
+//}
+
+
+
+

+ 3 - 1
module_kzks/src/main/java/org/jeecg/modules/wzOutboundOrderBNew/service/IWzOutboundOrderBNewService.java

@@ -4,6 +4,8 @@ import org.jeecg.common.api.vo.Result;
 import org.jeecg.modules.wzOutboundOrderBNew.entity.WzOutboundOrderBNew;
 import com.baomidou.mybatisplus.extension.service.IService;
 
+import java.io.IOException;
+
 /**
  * @Description: wz_outbound_order_b_new
  * @Author: jeecg-boot
@@ -13,6 +15,6 @@ import com.baomidou.mybatisplus.extension.service.IService;
 public interface IWzOutboundOrderBNewService extends IService<WzOutboundOrderBNew> {
 
     Result<?> importExcel1(String strUrl, Class<WzOutboundOrderBNew> clazz);
-    Result<?> importExcelNew(String strUrl, Class<WzOutboundOrderBNew> clazz);
+    Result<?> importExcelNew(String strUrl, Class<WzOutboundOrderBNew> clazz) throws IOException;
 
 }

+ 57 - 2
module_kzks/src/main/java/org/jeecg/modules/wzOutboundOrderBNew/service/impl/WzOutboundOrderBNewServiceImpl.java

@@ -7,8 +7,11 @@ import com.alibaba.excel.read.metadata.ReadSheet;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.poi.ss.usermodel.Workbook;
+import org.apache.poi.ss.usermodel.WorkbookFactory;
 import org.jeecg.common.api.vo.Result;
 import org.jeecg.common.util.DateUtils;
+import org.jeecg.modules.message.websocket.WebSocket;
 import org.jeecg.modules.projectKmbh.entity.KzksProjectKmbh;
 import org.jeecg.modules.wzOutboundOrderBNew.entity.WzOutboundOrderBNew;
 import org.jeecg.modules.wzOutboundOrderBNew.mapper.WzOutboundOrderBNewMapper;
@@ -21,9 +24,13 @@ import org.springframework.stereotype.Service;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 
+import javax.annotation.Resource;
 import java.io.*;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * @Description: wz_outbound_order_b_new
@@ -39,6 +46,11 @@ public class WzOutboundOrderBNewServiceImpl extends ServiceImpl<WzOutboundOrderB
     @SuppressWarnings("all")
     private WzOutboundOrderBNewMapper orderBNewMapper;
 
+    @Resource
+    private WebSocket webSocket;
+
+    private ExecutorService executorService = Executors.newFixedThreadPool(5);
+
     @Override
     public Result<?> importExcel1(String strUrl, Class<WzOutboundOrderBNew> clazz) {
         InputStream inputStream = null;
@@ -83,8 +95,36 @@ public class WzOutboundOrderBNewServiceImpl extends ServiceImpl<WzOutboundOrderB
     }
 
 
+//    @Override
+//    public Result<?> importExcelNew(String strUrl, Class<WzOutboundOrderBNew> clazz) throws IOException {
+//        int year = DateUtils.getYear();
+//        orderBNewMapper.deleteByDate(year);
+//
+//        File file = new File(strUrl);
+//        Workbook workbook = WorkbookFactory.create(file);
+//        int numOfSheets = workbook.getNumberOfSheets();//获取有多少表
+//        System.out.println("numOfSheets:" + numOfSheets);
+//        //开表的个数 个线程分别处理这些sheet
+//        List<Callable<Object>> tasks = new ArrayList<>();
+//        for(int i=0;i<numOfSheets;i++){
+//            int num = i;
+//            tasks.add(()->{
+//                EasyExcel.read(new File(strUrl), clazz, new WzOutboundOrderBNewListerner(this)).sheet(num).doRead();
+//                return null;
+//            });
+//        }
+//
+//        try {
+//            executorService.invokeAll(tasks);
+//        } catch (InterruptedException e) {
+//            throw new RuntimeException(e);
+//        }
+//
+//        return Result.ok("文件导入成功!");
+//    }
+
     @Override
-    public Result<?> importExcelNew(String strUrl, Class<WzOutboundOrderBNew> clazz) {
+    public Result<?> importExcelNew(String strUrl, Class<WzOutboundOrderBNew> clazz) throws IOException {
 //        InputStream inputStream = null;
 //        try {
 //            inputStream = new FileInputStream(strUrl);
@@ -108,11 +148,26 @@ public class WzOutboundOrderBNewServiceImpl extends ServiceImpl<WzOutboundOrderB
         int year = DateUtils.getYear();
         orderBNewMapper.deleteByDate(year);
 
-        ExcelReader excelReader = EasyExcel.read(new File(strUrl), clazz, new WzOutboundOrderBNewListerner(this)).build();
+//        File file = new File(strUrl);
+//
+//        Workbook workbook = WorkbookFactory.create(file);
+//        int numberOfSheets = workbook.getNumberOfSheets();
+
+
+        ExcelReader excelReader = EasyExcel.read(new File(strUrl), clazz, new WzOutboundOrderBNewListerner(this, webSocket)).build();
         ReadSheet readSheet = EasyExcel.readSheet(0).build();
         excelReader.read(readSheet);
         //这里千万别忘记关闭,读的时候会创建临时文件,到时磁盘会崩的
         excelReader.finish();
+//
+//        for(int i=0;i<numberOfSheets;i++){
+//            ReadSheet readSheet = EasyExcel.readSheet(i).build();
+//            excelReader.read(readSheet);
+//        }
+//        //这里千万别忘记关闭,读的时候会创建临时文件,到时磁盘会崩的
+//        excelReader.finish();
+
         return Result.ok("文件导入成功!");
     }
+
 }