Py学习  »  Elasticsearch

数据迁移方案 + Elasticsearch在综合搜索列表实现

王阿汪🐶 • 5 年前 • 254 次点击  
阅读 62

数据迁移方案 + Elasticsearch在综合搜索列表实现

1 UML

基本类图

2 迁移实现

2.1 从数据库读取进件id存到文件中

    @RequestMapping(value = "***", method = {RequestMethod.GET})
    @ResponseBody
    public void writeFile(Integer id) {
        if (null == id) {
            pieceSearchService.writePieceFile();
        } else {
            pieceSearchService.writePieceFile(id);
        }
    }
    
    // PieceSearchServiceImpl
    public void writePieceFile() {
        dataMoveManager.startWriteFile();
    }

复制代码
    // AbstractTemplate
    public void startWriteFile() {
        readDataAndWriteFile();
    }
    // DataMoveManager
    protected void readDataAndWriteFile() {
        pieceInfoWriteFileTask.start();
    }
    
    // AbstractTaskTemplate
    public void start() {
        List<? extends Callable<String>> task = createTask();
        ExecutorService executorService = createExecutorService();
        List list = runTaskForResult(executorService, task);
        writeLogFile(list);
    }
    
    // PieceInfoWriteFileTask
    protected List<? extends Callable<String>> createTask() {
        //获取进件起始id 并构建任务
        //查询所有进件数据(500W+)
        List<PieceIdAndIdDTO> pieceIdAndIdDTOS = pieceSearchDao.queryAllPieceInfo();
        List<MyTask> tasks = this.getTaskFromPieceId(pieceIdAndIdDTOS);
        return tasks;
    }
    
    // PieceInfoWriteFileTask
    List<MyTask> getTaskFromPieceId(List<PieceIdAndIdDTO> pieceIdAndIdDTOS) {
        //任务集合
        List<MyTask> result = Lists.newArrayList();
        Integer index = 1;
        int fileName = 1;
        List<PieceIdAndIdDTO> ids = Lists.newArrayList();
        for (PieceIdAndIdDTO pieceIdAndIdDTO : pieceIdAndIdDTOS) {
            ids.add(pieceIdAndIdDTO);
            //每 条创建一个任务
            if (index % FILE_MAX_NUM == 0) {
                //创建任务
                MyTask myTask = new MyTask();
                myTask.setIds(ids);
                myTask.setFileName(Integer.toString(fileName));
                result.add(myTask);
                ids = Lists.newArrayList(); //一定要new
                fileName++;
            }
            index++;
        }
        if (!ids.isEmpty()) {
            MyTask myTask = new MyTask();
            myTask.setIds(ids);
            myTask.setFileName(Integer.toString(fileName));
            result.add(myTask);
        }
        return result;
    }
    // PieceInfoWriteFileTask
    protected ExecutorService createExecutorService() {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);
        return executorService;
    }
    
    // AbstractTaskTemplate
    private List runTaskForResult


    
(ExecutorService executorService, List<? extends Callable<String>> tasks) {
        //线程池执行任务
        List<Future<String>> futures = new ArrayList<>();
        try {
            futures = executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            executorService.shutdown();
        }
        //组装返回结果
        List<String> results = Lists.newArrayList();
        for (Future<String> future : futures) {
            try {
                results.add(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return results;
    }
    
    // PieceInfoWriteFileTask
    protected void writeLogFile(List results) {
        //日志文件名称
        String fileName = "writeFile.log";
        //文件路径
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;
        //按行写出
        try {
            FileKit.writeLines(results, filePath, "UTF-8");
        } catch (IOException e) {
            e.printStackTrace();
            log.error("class:PieceInfoWriteFileTask  method: writeLogFile()" +
                " 日志信息写出失败, 失败原因:" + e.getMessage());
        }
    }
复制代码
    // PieceInfoWriteFileTask
    class MyTask implements Callable<String> {

        private List<PieceIdAndIdDTO> ids;

        private String fileName;

        @Override
        public String call() {
            String message = "文件生成失败";
            try {
                message = writeFile(ids, fileName);
                long l2 = System.currentTimeMillis();
            } catch (Exception e) {
                return message + "----失败原因:" + e.getMessage();
            }
            return message;
        }
    }
    
    // PieceInfoWriteFileTask
    private String writeFile(List<PieceIdAndIdDTO> pieceIdAndId, String fileName) {
        //文件路径+文件名
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName + ".txt";
        try {
            log.info("文件输出路径:" + filePath);
            //输出文件
            FileKit.writeLines(pieceIdAndId, filePath, "UTF-8");
        } catch (IOException e) {
            return "此文件生成失败:" + fileName + "失败原因:" + e.getMessage();
        }
        return "此文件生成成功:" + fileName;
    }
复制代码

2.2 数据迁移

2.2.1 从文件读取id存到队列中

    @RequestMapping(value = "***", method = {RequestMethod.GET})
    @ResponseBody
    public void dataMove() {
        pieceSearchService.pieceDataMove();
    }
    
    // PieceSearchServiceImpl
    public void pieceDataMove() {
        dataMoveManager.startReadForDb();
    }
    
    
复制代码



    
    // AbstractTemplate
    public void startReadForDb() {
        readFileForDb();
    }
    // DataMoveManager
    protected void readFileForDb() {
        pieceInfoDataMoveTask.start();
    }
    
    // AbstractTaskTemplate
    public void start() {
        List<? extends Callable<String>> task = createTask();
        ExecutorService executorService = createExecutorService();
        List list = runTaskForResult(executorService, task);
        writeLogFile(list);
    }
    
    // PieceInfoDataMoveTask
    protected List<? extends Callable<String>> createTask() {

        //获取此路径下的所有.txt文件
        List<File> files = null;
        try {
            files = FileKit.listFilesWithSuffix(new File(ABSOLUTE_PATH + FILE_ROOT_PATH), ".txt");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("class:PieceInfoDataMoveTask  method: createTask()" +
                " 创建任务组失败, 失败原因:" + e.getMessage());
        }

        //根据文件构建任务
        List<MyTask> tasks = getTaskWithFile(files);
        return tasks;
    }
    
    // PieceInfoDataMoveTask
     private List<MyTask> getTaskWithFile(List<File> files) {
        ArrayList<MyTask> tasks = Lists.newArrayList();
        for (File file : files) {
            MyTask myTask = new MyTask();
            myTask.setFile(file);
            tasks.add(myTask);
        }
        return tasks;
    }
    
    // PieceInfoDataMoveTask
    private class MyTask implements Callable<String> {

        private File file;

        @Override
        public String call() throws Exception {
            //每次保存的进件Id
            ArrayList<String> ids = new ArrayList<>();
            //开始时间
            long beginTime = System.currentTimeMillis();
            //文件名称
            String fileName = file.getName();
            BufferedReader reader = null;
            //行数据
            String line;
            int a = 1;
            try {
                reader = FileKit.getReader(file, "UTF-8");
                while (true) {
                    line = reader.readLine();
                    if (Func.isEmpty(line)) {
                        break;
                    }
                    //每行数据格式为  id:1121  pieceId:ZPTE1213
                    String id = line.split("\\s+")[0].split(":")[1];
                    ids.add(id);
                    //每多少行  执行插入操作
                    if (a % MAX_SAVE_NUM == 0) {
                        //存入队列
                        QueueUtil.QUEUE_IDS.put(ids);
                        ids = Lists.newArrayList();
                    }
                    a++;
                }
            } catch (Exception e) {
                return "读取" + fileName + "文件失败原因:" + e.getMessage();
            } finally {
                if (!ids.isEmpty()) {
                    //存入队列
                    QueueUtil.QUEUE_IDS.put(ids);
                }
                IoKit.close(reader);
            }
            long useTime = (System.currentTimeMillis() - beginTime) / 1000;
            return "读取" + fileName + "文件完毕,耗时:" + useTime + "秒 , 文件共:" + (a - 1) + "条数据";
        }
    }
    
    // QueueUtil
    public class QueueUtil {

    /**
     *队列一:存放从文件中读取的id集合
     */
    public static final LinkedBlockingQueue<List<String>> QUEUE_IDS = new LinkedBlockingQueue<List<String>>();

    /**
     * 队列二:存放组装数据过后的List<JsonObject>
     */
    public static final  LinkedBlockingQueue<List<JSONObject>> QUEUE_JSON_OBJECTS = new LinkedBlockingQueue<List<JSONObject>>();


    /**
     *队列三:存放所有失败id集合
     */
    public static final LinkedBlockingQueue<List<String>> QUEUE_FAIL_IDS = new LinkedBlockingQueue<List<String>>();

    /**
     *队列四:存放所有校验过的集合
     */
    public static final LinkedBlockingQueue<SaveToMysqlDTO> QUEUE_SAVETOMYSQL = new LinkedBlockingQueue<SaveToMysqlDTO>();

    /**
     *队列五:存放所有校验过的集合
     */
    public static final LinkedBlockingQueue<List<Document>> QUEUE_SAVETOMONGO = new LinkedBlockingQueue<List<Document>>();


    /**
     *队列六:存放所有校验过的集合
     */
    public static final LinkedBlockingQueue<Map<Long,PieceJsonDTO>> QUEUE_SAVETOES = new LinkedBlockingQueue< Map<Long,PieceJsonDTO>>();
}

复制代码
    // PieceInfoDataMoveTask
    protected ExecutorService createExecutorService() {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM));
        return executorService;
    }
    
    // AbstractTaskTemplate
    private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) {
        //线程池执行任务
        List<Future<String>> futures = new ArrayList<>();
        try {
            futures = executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            executorService.shutdown();
        }
        //组装返回结果
        List<String> results = Lists.newArrayList();
        for (Future<String> future : futures) {
            try {
                results.add(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return results;
    }
    
    // PieceInfoDataMoveTask
    protected void writeLogFile(List results) {
    }

复制代码

2.2.2 消费进件id队列(QUEUE_IDS)

    //spring 配置文件加载bean 调用init() 方法
    <bean  class="com.migration.eagle.manager.PieceSearchManager" init-method="init" />
    /**
     * 启动线程
     */
    // PieceSearchManager
    private void init() {
        //组装JsonObject
        readQueueForAssembleJsonThread.start();
        log.info("线程1启动完毕,正在读取ids队列!!!!");
        //校验数据
        readQueueForValidateThread.start();
        log.info("线程2启动完毕,正在读取List<JsonObject>队列!!!!");
        //入库数据
        readQueueForSaveMysqlThread.start();
        log.info("线程3启动完毕,正在读取saveToMysql队列!!!!");
        //入库Mongo
        readQueueForSaveMongoThread.start();
        log.info("线程4启动完毕,正在读取saveToMongo队列!!!!");
        //入库es
        readQueueForSaveESThread.start();
        log.info("线程5启动完毕,正在读取saveToES队列!!!!");
        //写出失败日志
        readQueueForWriteLogThread.start();
        log.info("线程6启动完毕,正在读取失败ids队列!!!!");
    }   
复制代码
public class ReadQueueForAssembleJsonThread extends Thread {

    @Autowired
    PieceAssembleManager pieceAssembleManager;

    @Override
    public void run() {
        //创建线程池
        //任务数
        int countThread = Integer.valueOf(new Config().getMaxAssembleJsonThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        for (int i = 0; i < countThread; i++) {
            MyTask task = new MyTask();
            executorService.execute(task);
        }
        executorService.shutdown();
    }


    @Data
    class MyTask implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    //读取队列中的ids
                    List<String> ids = QueueUtil.QUEUE_IDS.take();
                    if (null != ids) {
                        long l1 = System.currentTimeMillis();
                        //组装数据
                        Map<String, List> pieceJson_map = pieceAssembleManager.getPieceJson(ids);
                        //存入队列
                        QueueUtil.QUEUE_JSON_OBJECTS.put(pieceJson_map.get("success"));
                        //存入失败ids队列
                        QueueUtil.QUEUE_FAIL_IDS.put(pieceJson_map.get("fail"));
                        long l2 = System.currentTimeMillis();
                        log.info("组装JSON耗时:" + (l2 - l1)," ,队列大小: "+QueueUtil.QUEUE_IDS.size());
                        ids=null;
                        pieceJson_map=null;
                    }
                } catch (Exception e) {
                    log.error("组装数据失败,失败原因: "+e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }


}
复制代码

2.2.3 消费JsonObject队列(QUEUE_JSON_OBJECTS)

public class ReadQueueForValidateThread extends Thread {

    @Autowired
    PieceSearchManager pieceSearchManager;


    @Override
    public void run() {
        //创建线程池
        int countThread = Integer.valueOf(new Config().getMaxValidateThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        List<JSONObject> jsonObjects = Lists.newArrayList();
        while (true) {
            try {
                //从队列中读取List<JsonObject>
                List<JSONObject> poll = QueueUtil.QUEUE_JSON_OBJECTS.take();
                if (null != poll) {
                    //创建任务
                    for (int i = 1; i <= poll.size(); i++) {
                        jsonObjects.add(poll.get(i - 1));
                        if (i % 100 == 0) {
                            MyTask myTask = new


    
 MyTask();
                            myTask.setJsonObjects(jsonObjects);
                            executorService.execute(myTask);
                            jsonObjects = Lists.newArrayList();
                        }
                    }
                    if (!jsonObjects.isEmpty()) {
                        MyTask myTask = new MyTask();
                        myTask.setJsonObjects(jsonObjects);
                        executorService.execute(myTask);
                        jsonObjects = Lists.newArrayList();
                    }
                    log.info("校验队列大小: "+QueueUtil.QUEUE_JSON_OBJECTS.size());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }


    }


    @Data
    class MyTask implements Runnable {
        private List<JSONObject> jsonObjects;

        @Override
        public void run() {
            long l = System.currentTimeMillis();
            SaveToMysqlDTO dto = new SaveToMysqlDTO();
            //mongo队列所需数据
            List<Document> toMongos = Lists.newArrayList();
            //es队列所需数据
            Map<Long,PieceJsonDTO>  id_PieceJsonDTO =Maps.newHashMap();

            List<InvestigatePieceEntity> investigatePieceDTOS = Lists.newArrayList();
            List<PieceDetailEntity> pieceDetailDTOS = Lists.newArrayList();
            List<PieceContactEntity> pieceContactDTOS = Lists.newArrayList();
            //失败结果
            List<String> failedJson = Lists.newArrayList();
            try {
                for (JSONObject finalObj : jsonObjects) {
                    Map<String, JSONObject> map = pieceSearchManager.validatePiece(finalObj);
                    if (map.containsKey("successData")) {
                        pieceSearchManager.savePiecesLast(map.get("successData"), investigatePieceDTOS, pieceContactDTOS, pieceDetailDTOS,id_PieceJsonDTO);
                        //增加id(存入mongo使用)
                        JsonObject jsonObject = JsonKit.parseJsonObject(finalObj.toString());
                        JsonElement appkey = JsonKit.getValueByPath(jsonObject, MongoPieceManagerImpl.PIECE_CODE_JSON_PATH);
                        jsonObject.add(MongoPieceManagerImpl.MONGO_ID, appkey);
                        toMongos.add(Document.parse(jsonObject.toString()));
                    } else {
                        failedJson.add(((Map) ((Map) map.get("failed").get("data")).get("applicant")).get("intpc_id").toString());
                    }
                    map=null;
                }
                dto.setInvestigatePieceDTOS(investigatePieceDTOS);
                dto.setPieceContactDTOS(pieceContactDTOS);
                dto.setPieceDetailDTOS(pieceDetailDTOS);

                //存入失败队列
                QueueUtil.QUEUE_FAIL_IDS.put(failedJson);
                //存入入库队列
                QueueUtil.QUEUE_SAVETOMYSQL.put(dto);
                QueueUtil.QUEUE_SAVETOMONGO.put(toMongos);
                QueueUtil.QUEUE_SAVETOES.put(id_PieceJsonDTO);

                long l1 = System.currentTimeMillis();
                log.info("校验耗时:" + (l1 - l));
                dto=null;
                toMongos=null;
                id_PieceJsonDTO=null;
                failedJson=null;
                jsonObjects=null;
            } catch (Exception e) {
                log.error("校验失败,失败原因:  "+e.getMessage());
                e.printStackTrace();
            }
        }
    }

}
复制代码

2.2.4 消费保存到数据库数据队列(QUEUE_SAVETOMYSQL)

public class ReadQueueForSaveMysqlThread extends Thread {

    @Autowired
    PieceSearchManager pieceSearchManager;

    @Override
    public void run() {
        //创建线程池
        //任务数
        int countThread = Integer.valueOf(new Config().getMaxSaveMysqlThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        for (int i = 0; i < countThread; i++) {
            MyTask task = new MyTask();
            executorService.execute(task);
        }
        executorService.shutdown();
    }


    @Data
    class MyTask implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    //从队列中读取校验过后的数据
                    SaveToMysqlDTO dto = QueueUtil.QUEUE_SAVETOMYSQL.take();
                    if (null != dto) {
                        //数据入库
                        long l = System.currentTimeMillis();
                        pieceSearchManager.savePieceToSql(dto);
                        long l1 = System.currentTimeMillis();
                        log.info("入库Mysql耗时:" + (l1 - l)+" , 队列大小: "+QueueUtil.QUEUE_SAVETOMYSQL.size());
                    }
                } catch (InterruptedException e) {
                    log.error("插入Mysql失败,失败原因:  "+e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }

}
复制代码

2.2.5 消费保存到mongoDB数据队列(QUEUE_SAVETOMONGO)

public class ReadQueueForSaveMongoThread extends Thread {

    @Autowired
    MongoUtil mongoUtil;

    private MongoCollection mongoCollection;

    @Override
    public void run() {
        //获取mongo 链接
        mongoCollection = mongoUtil.mongoClient();
        //创建线程池
        //任务数
        int countThread = Integer.valueOf(new Config().getMaxSaveMongoThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        for (int i = 0; i < countThread; i++) {
            MyTask task = new MyTask();
            executorService.execute(task);
        }
        executorService.shutdown();
    }


    @Data
    class MyTask implements Runnable {

        @Override
        


    
public void run() {
            while (true) {
                try {
                    //从队列中读取校验过后的数据
                    List<Document> documents = QueueUtil.QUEUE_SAVETOMONGO.take();
                    if (null != documents) {
                        //数据入库
                        long l = System.currentTimeMillis();
                        mongoCollection.insertMany(documents);
                        long l1 = System.currentTimeMillis();
                        log.info("Mongo耗时:" + (l1 - l)+", 队列大小: "+ QueueUtil.QUEUE_SAVETOMONGO.size());
                    }
                } catch (Exception e) {
                    log.error("插入mongo失败,失败原因:  "+e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }

}

复制代码

2.2.6 消费保存到Elasticsearch数据队列(QUEUE_SAVETOES)

public class ReadQueueForSaveESThread extends Thread {

    @Autowired
    ElasticsearchManager elasticsearchManager;

    @Override
    public void run() {
        //创建线程池
        //任务数
        int countThread = Integer.valueOf(new Config().getMaxSaveESThreadNum());
        ExecutorService executorService = Executors.newFixedThreadPool(countThread);
        for (int i = 0; i < countThread; i++) {
            MyTask task = new MyTask();
            executorService.execute(task);
        }
        executorService.shutdown();
    }


    @Data
    class MyTask implements Runnable {

        @Override
        public void run() {
            BulkProcessor bulkProcessor = elasticsearchManager.createBulkProcessor();
            while (true) {
                try {
                    //从队列中读取校验过后的数据
                    Map<Long, PieceJsonDTO> map = QueueUtil.QUEUE_SAVETOES.take();
                    if (null != map) {
                        for (Long id : map.keySet()) {
                            //创建request  blukprocessor
                            PieceJsonDTO pieceJsonDTO = map.get(id);
                            String s = JsonKit.toJsonSerializeNulls(pieceJsonDTO);
                            bulkProcessor.add(elasticsearchManager.createPieceIndexRequest(Long.toString(id),s));
                            pieceJsonDTO=null;
                        }
                        map = null;
                        log.info("es队列大小:"+QueueUtil.QUEUE_SAVETOES.size());
                    }
                } catch (Exception e) {
                    log.error("插入es失败,失败原因:  "+e.getMessage());
                    e.printStackTrace();
                }
            }
        }
    }

}
复制代码

2.2.7 消费失败队列保存到文件中(QUEUE_FAIL_IDS)

public class ReadQueueForWriteLogThread extends Thread {

    /**
     * 文件根路径
     */
    private final static String FILE_ROOT_PATH = new Config().getFileRootPath();
    /**
     * 项目路径
     */
    private final static String ABSOLUTE_PATH = FileKit.getAbsolutePath("");


    @Override
    public void run() {
        //日志文件名称
        String fileName = "datamove.log";
        //文件路径
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;
        while (true) {
            try {
                List<String> fail_ids = QueueUtil.QUEUE_FAIL_IDS.take();
                if (null != fail_ids) {
                    ArrayList<String> result = Lists.newArrayList();
                    String s = "失败个数:" + fail_ids.size() + " , 失败id: " + fail_ids;
                    result.add(s);
                    //按行写出
                    FileKit.appendLines(result, filePath, "UTF-8");
                }
            } catch (Exception e) {
                log.error("写出失败ids失败,失败原因:  "+e.getMessage());
                e.printStackTrace();
            }
        }
    }
}
复制代码

2.3 失败数据处理

    @RequestMapping(value = "***", method = {RequestMethod.GET})
    @ResponseBody
    public void readLog() {
        pieceSearchService.pieceLogFile();
    }
    
    // PieceSearchServiceImpl
    public void pieceLogFile() {
        dataMoveManager.startReadLogForDb();
    }
    
    // AbstractTemplate
    public void startReadLogForDb() {
        readLogFileForDb();
    }
    
    // DataMoveManager
    protected void readLogFileForDb() {
        pieceInfoLogFileTask.start();
    }
    
    // AbstractTaskTemplate
    public void start() {
        List<? extends Callable<String>> task = createTask();
        ExecutorService executorService = createExecutorService();
        List list = runTaskForResult(executorService, task);
        writeLogFile(list);
    }
    
    // PieceInfoLogFileTask
    protected List<? extends Callable<String>> createTask() {
        //获取 datamove.log路径
        String fileName = "datamove.log";
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;

        //按行读取  获取文件中 失败id:
        List<String> ids = Lists.newArrayList();
        try {
            ids = readLines(new File(filePath), "UTF-8"


    
, ids);
        } catch (IOException e) {
            e.printStackTrace();
        }

        //创建任务组
        if (!Func.isEmpty(ids)) {
            List<MyTask> taskWithIds = getTaskWithIds(ids);
            return taskWithIds;
        }
        return null;
    }
    
    // PieceInfoLogFileTask
    private List<MyTask> getTaskWithIds(List<String> ids) {
        //一百个ids 起一个任务
        ArrayList<MyTask> tasks = Lists.newArrayList();
        int a = 1;
        int beginIndex = 0;
        while (true) {
            if (a * 100 >= ids.size()) {
                MyTask myTask = new MyTask();
                myTask.setIds(ids.subList(beginIndex, ids.size()));
                tasks.add(myTask);
                break;
            }
            MyTask myTask = new MyTask();
            //subList() 包左不包右
            myTask.setIds(ids.subList(beginIndex, beginIndex + 100));
            tasks.add(myTask);
            beginIndex += 100;
            a++;
        }
        return tasks;
    }
    
    private class MyTask implements Callable<String> {

        private List<String> ids;

        @Override
        public String call() throws Exception {
            //每次保存的进件Id
            ArrayList<String> pieceIds = new ArrayList<>();
            //开始时间
            long beginTime = System.currentTimeMillis();
            for (int i = 0; i < ids.size(); i++) {
                pieceIds.add(ids.get(i));
                //每多少行  执行插入操作
                if (i % MAX_SAVE_NUM == 0) {
                    //执行查询保存操作
                    System.out.println(ids);
                    pieceIds.clear();
                }
            }
            if (!pieceIds.isEmpty()) {
                //执行查询保存操作

            }
            long useTime = (System.currentTimeMillis() - beginTime) / 1000;
            return "日志文件失败进件重新入库完毕,耗时:" + useTime + "秒 , 文件共:" + ids.size() + "条数据, 失败个数: , 失败id:";
        }
    }
    
    // PieceInfoLogFileTask
    protected ExecutorService createExecutorService() {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(Integer.valueOf(MAX_THREAD_NUM));
        return executorService;
    }
    
    // AbstractTaskTemplate
    private List runTaskForResult(ExecutorService executorService, List<? extends Callable<String>> tasks) {
        //线程池执行任务
        List<Future<String>> futures = new ArrayList<>();
        try {
            futures = executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            executorService.shutdown();
        }
        //组装返回结果
        List<String> results = Lists.newArrayList();
        for (Future<String> future : futures) {
            try {
                results.add(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return results;
    }
    
    // PieceInfoLogFileTask
    protected void writeLogFile(List results) {
        //日志文件名称
        String fileName = "datamove.log";
        //文件路径
        String filePath = ABSOLUTE_PATH + FILE_ROOT_PATH + fileName;
        //按行写出
        try {
            FileKit.writeLines(results, filePath, "UTF-8");
        } catch (IOException e) {
            e.printStackTrace();
            log.error("class:PieceInfoDataMoveTask  method: writeLogFile()" +
                " 日志信息写出失败, 失败原因:" + e.getMessage());
        }
    }

复制代码

2.4 数据迁移流程图

数据迁移流程图

3 Elasticsearch在综合搜索列表实现

public Page<InvestigatePiecePageResDTO> pagination(final InvestigatePiecePageReqDTO investigatepiecePageReqDTO, final Integer pageNo, final Integer pageSize) {
        //创建es组合查询器
        BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();
        //进件编号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceCode())) {
            mustQuery.must(QueryBuilders.wildcardQuery("pieceCode.keyword", "*" + investigatepiecePageReqDTO.getPieceCode() + "*"));
        }
        //手机号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getMobile())) {
            mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.mobile.keyword", "*" + investigatepiecePageReqDTO.getMobile() + "*"));
        }
        //身份证号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getIdCard())) {
            mustQuery.must(QueryBuilders.wildcardQuery("idCard.keyword", "*" + investigatepiecePageReqDTO.getIdCard() + "*"));
        }
        //服务网点
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getServicePoint())) {
            mustQuery.must(QueryBuilders.wildcardQuery("servicePoint.keyword", "*" + investigatepiecePageReqDTO.getServicePoint() + "*"));
        }
        //座机号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getPhone())) {
            mustQuery.must(QueryBuilders.wildcardQuery("pieceContacts.phone.keyword", "*" + investigatepiecePageReqDTO.getPhone() + "*"));
        }
        //单位名称


    

        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyName())) {
            mustQuery.must(QueryBuilders.wildcardQuery("companyName.keyword", "*" + investigatepiecePageReqDTO.getCompanyName() + "*"));
        }
        //工单编号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getOrderCode())) {
            mustQuery.must(QueryBuilders.wildcardQuery("orderCode.keyword", "*" + investigatepiecePageReqDTO.getOrderCode() + "*"));
        }
        //案件编号
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCaseCode())) {
            mustQuery.must(QueryBuilders.wildcardQuery("caseCode.keyword", "*" + investigatepiecePageReqDTO.getCaseCode() + "*"));
        }
        //规则搜索
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getRuleCode())) {
            mustQuery.must(QueryBuilders.wildcardQuery("ruleName.keyword", "*" + investigatepiecePageReqDTO.getRuleCode() + "*"));
        }
        //客户经理
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerManager())) {
            mustQuery.must(QueryBuilders.wildcardQuery("customerManager.keyword", "*" + investigatepiecePageReqDTO.getCustomerManager() + "*"));
        }
        //进件大区
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getPieceFromArea())) {
            mustQuery.must(QueryBuilders.wildcardQuery("pieceFromArea.keyword", "*" + investigatepiecePageReqDTO.getPieceFromArea() + "*"));
        }
        //进件状态
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getBusinessStatus())) {
            mustQuery.must(QueryBuilders.termQuery("businessStatus.keyword", investigatepiecePageReqDTO.getBusinessStatus()));
        }
        //产品类型
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getProductCode())) {
            mustQuery.must(QueryBuilders.termQuery("productName.keyword", investigatepiecePageReqDTO.getProductCode()));
        } else {
            //默认查询当前用户权限下的所有产品类型
            List<String> productCodes = getProductCodeByCurrentUser();
            mustQuery.must(QueryBuilders.termsQuery("productName.keyword", productCodes));
        }
        //反欺诈处理人
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionMaker())) {
            mustQuery.must(QueryBuilders.wildcardQuery("decisionMaker.keyword", "*" + investigatepiecePageReqDTO.getDecisionMaker() + "*"));
        }
        //单位地址
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCompanyAddress())) {
            mustQuery.must(QueryBuilders.wildcardQuery("companyAddress.keyword", "*" + investigatepiecePageReqDTO.getCompanyAddress() + "*"));
        }
        //欺诈报警
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getFraudAlarmLevelCode())) {
            DictionaryDTO dictionaryDTO = DictionaryKit.QZJGLB_MAP().get(investigatepiecePageReqDTO.getFraudAlarmLevelCode());
            mustQuery.must(QueryBuilders.termQuery("fraudAlarmLevelName.keyword", dictionaryDTO.getName()));
        }
        //决策结果
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getResultCode())) {
            DictionaryDTO dictionaryDTO = DictionaryKit.JCJG_MAP().get(investigatepiecePageReqDTO.getResultCode());
            mustQuery.must(QueryBuilders.termQuery("decisionResult.keyword", dictionaryDTO.getName()));
        }
        //反欺诈处理状态
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionStatus())) {
            DictionaryDTO dictionaryDTO = DictionaryKit.JCZT_MAP().get(investigatepiecePageReqDTO.getDecisionStatus());
            mustQuery.must(QueryBuilders.termQuery("decisionStatus.keyword", dictionaryDTO.getName()));
        }
        //客服人员
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerService())) {
            mustQuery.must(QueryBuilders.wildcardQuery("serviceUserName.keyword", "*" + investigatepiecePageReqDTO.getCustomerService() + "*"));
        }
        //客户姓名
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getCustomerName())) {
            mustQuery.must(QueryBuilders.wildcardQuery("customerName.keyword", "*" + investigatepiecePageReqDTO.getCustomerName() + "*"));
        }
        //团队经理
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getTeamManager())) {
            mustQuery.must(QueryBuilders.wildcardQuery("teamManager.keyword", "*" + investigatepiecePageReqDTO.getTeamManager() + "*"));
        }
        //进件时间
        try {
            if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) {
                mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime()));
            } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeStart())) {
                mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeStart()).getTime()));
            } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getEnterCreditTimeEnd())) {
                mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getEnterCreditTimeEnd()).getTime()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        //决策时间
        try {
            if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart()) && Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) {
                mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime()).to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime()));
            } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeStart())) {
                mustQuery.must(QueryBuilders.rangeQuery("decisionTime").from(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeStart()).getTime()));
            } else if (Func.isNotEmpty(investigatepiecePageReqDTO.getDecisionTimeEnd())) {
                mustQuery.must(QueryBuilders.rangeQuery("decisionTime").to(SIMPLE_DATE_FORMAT.parse(investigatepiecePageReqDTO.getDecisionTimeEnd()).getTime()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        //是否初始化
        if (Boolean.valueOf(investigatepiecePageReqDTO.getIfInit())) {
            //第一次进入,设置时间为今天
            mustQuery.must(QueryBuilders.rangeQuery("enterCreditTime").from(getTodayStartTime()).to(getTodayEndTime()));
        }
        //组装分页 排序条件


    

        SearchSourceBuilder source = new SearchSourceBuilder();
        source.query(mustQuery);
        //排序字段
        if (Func.isNotEmpty(investigatepiecePageReqDTO.getSortChangeKey())) {
            SortOrder sortOrder = "asc".equals(investigatepiecePageReqDTO.getSortChangeWay()) ? SortOrder.ASC : SortOrder.DESC;
            if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_ENTER_CREDIT_TIME)) {
                source.sort("enterCreditTime", sortOrder);
            } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_TIME)) {
                source.sort("decisionTime", sortOrder);
            } else if (Func.equals(investigatepiecePageReqDTO.getSortChangeKey(), InvestigatePieceEntity.DB_COL_DECISION_MAKER)) {
                source.sort("decisionMaker.keyword", sortOrder);
            } else {
                source.sort("enterCreditTime", sortOrder);
            }
        } else {
            source.sort("enterCreditTime", SortOrder.DESC);
        }
        //设置分页  注意:es分页默认从0页开始
        source.from(pageNo - 1);
        source.size(pageSize);

        //查询结果
        Page<InvestigatePiecePageResDTO> page = getResponseFromEs(source);
        return page;
    }
复制代码
private Page<InvestigatePiecePageResDTO> getResponseFromEs(SearchSourceBuilder source) {
        //创建查询es请求
        SearchHits hits;
        try {
            SearchResponse response = elasticsearchManager.getPieceResponseWithBuilder(source);
            hits = response.getHits();
        } catch (Exception e) {
            throw new ServiceException(500, "es搜索引擎连接失败");
        }
        //获取命中数据
        SearchHit[] hitsHits = hits.getHits();
        //总命中数
        Long totalHits = hits.getTotalHits();
        //组装前端返回DTO
        List<InvestigatePiecePageResDTO> result = Lists.newArrayList();
        for (SearchHit hitsHit : hitsHits) {
            InvestigatePiecePageResDTO investigatePiecePageResDTO = new InvestigatePiecePageResDTO();
            Map<String, Object> sourceAsMap = hitsHit.getSourceAsMap();
            investigatePiecePageResDTO.setId(Long.valueOf(hitsHit.getId()));
            investigatePiecePageResDTO.setPieceCode((String) sourceAsMap.get("pieceCode"));
            investigatePiecePageResDTO.setAppKey((String) sourceAsMap.get("appKey"));
            Integer decisionId = (Integer) sourceAsMap.get("decisionId");
            if (null != decisionId) {
                investigatePiecePageResDTO.setDecisionId(decisionId.longValue());
            }
            List<String> orderCode = (List<String>) sourceAsMap.get("orderCode");
            if (Func.isNotEmpty(orderCode)) {
                investigatePiecePageResDTO.setOrderCode(((List<String>) sourceAsMap.get("orderCode")).get(0));
            }
            investigatePiecePageResDTO.setCustomerName((String) sourceAsMap.get("customerName"));
            investigatePiecePageResDTO.setIdCard((String) sourceAsMap.get("idCard"));
            investigatePiecePageResDTO.setProductName(DictionaryKit.CPLX_MAP().get(sourceAsMap.get("productName")).getName());
            investigatePiecePageResDTO.setEnterCreditTime(SIMPLE_DATE_FORMAT.format(new Date((Long) sourceAsMap.get("enterCreditTime"))));
            investigatePiecePageResDTO.setBusinessStatus((String) sourceAsMap.get("businessStatus"));
            Long decisionTime = (Long) sourceAsMap.get("decisionTime");
            if (null != decisionTime) {
                investigatePiecePageResDTO.setDecisionTime(SIMPLE_DATE_FORMAT.format(new Date(decisionTime)));
            }
            investigatePiecePageResDTO.setDecisionResult((String) sourceAsMap.get("decisionStatus"));
            investigatePiecePageResDTO.setFraudAlarmLevelName((String) sourceAsMap.get("fraudAlarmLevelName"));
            investigatePiecePageResDTO.setDecisionMaker((String) sourceAsMap.get("decisionMaker"));
            investigatePiecePageResDTO.setServicePoint((String) sourceAsMap.get("servicePoint"));
            String decisionStatus = (String) sourceAsMap.get("decisionStatus");
            if ("已决策".equals(decisionStatus)) {
                investigatePiecePageResDTO.set_disabled(false);
            } else {
                investigatePiecePageResDTO.set_disabled(true);
            }
            result.add(investigatePiecePageResDTO);
        }

        //组装前端分页DTO
        Page<InvestigatePiecePageResDTO> page = new Page<>();
        page.setRecords(result);
        page.setTotal(totalHits.intValue());
        return page;
    }
复制代码

今天看啥 - 高品质阅读平台
本文地址:http://www.jintiankansha.me/t/XPe6I8tw2S
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/23302
 
254 次点击