多线程向设备发送数据

简介: 多线程向设备发送数据

需求:做一个部门授权,可以把所选择部门下面的所有人的人脸信息传到设备组里(多个设备),问题在于图片是通过Base64处理之后的,会导致文件名非常长,如果一次性传很多数据就会超过设备的最长请求长度,如果不用Base64处理的话让设备自己去minio下载就会导致特别慢,设备容易掉线,所以就用多线程发送。先看一下全部的代码,再看一下多线程的方法。

全部代码:

@Override
    public List<Long> createDeptAuthorize(DeptAuthorizeSaveReqVO createReqVO) {
        List<Long> ids = new ArrayList<>();
        List<Long> userIds = new ArrayList<>();
        //遍历部门表,查看该部门是否已有授权
        List<Long> deptAuthorizeExits = new ArrayList<>();
        List<Long> deptAuthorizeNotExits = new ArrayList<>();
        //设备数组
        List<BaseDeviceDo> baseDeviceDos = new ArrayList<>();
        //把待更新授权的人员id存在这个数组里
        List<Long> userUpdateIds = new ArrayList<>();
        //把待更新授权的人员存在这个数组里
        List<AdminUserRespDTO> userUpdateList = new ArrayList<>();
        // 设置用户信息
        List<User> userList = new ArrayList<>();
        for (Long l : createReqVO.getDeptId()) {
            QueryWrapper<DeptAuthorizeDO> queryWrapperDept = new QueryWrapper<>();
            queryWrapperDept.eq("dept_id", l);
            queryWrapperDept.eq("deleted", 0);
            DeptAuthorizeDO deptAuthorizeDO1 = deptAuthorizeMapper.selectOne(queryWrapperDept);
            if (deptAuthorizeDO1 != null) {
                //表里存在这条数据就代表已有部门授权,所以要修改部门授权
                deptAuthorizeExits.add(deptAuthorizeDO1.getDeptId());
            }else{
                deptAuthorizeNotExits.add(l);
            }
        }
        //先遍历不存在的部门授权,即新增授权并发到设备上
        for (Long l : deptAuthorizeNotExits) {
                // 插入
                DeptAuthorizeDO deptAuthorize = new DeptAuthorizeDO();
                deptAuthorize.setDeptId(l);
                deptAuthorize.setAuthorizeWay(createReqVO.getAuthorizeWay());
                deptAuthorize.setDoorId(createReqVO.getDoorId());
                deptAuthorize.setDoorGroupId(createReqVO.getDoorGroupId());
                deptAuthorize.setEffectMethod(createReqVO.getEffectMethod());
                deptAuthorize.setInPeriodId(createReqVO.getInPeriodId());
                deptAuthorize.setOutPeriodId(createReqVO.getOutPeriodId());
                deptAuthorizeMapper.insert(deptAuthorize);
                // 假设deptAuthorizeMapper.insert()返回插入记录的ID
                ids.add(deptAuthorize.getId());
                //查找这个部门下面有多少人
                List<AdminUserRespDTO> adminUserDOList = adminUserApi.getUserListByDeptId(l);
                for (AdminUserRespDTO adminUserDO : adminUserDOList) {
                    //遍历人员数组看看是否在授权生效表里
                    QueryWrapper queryWrapper = new QueryWrapper();
                    queryWrapper.eq("user_id", adminUserDO.getId());
                    queryWrapper.eq("deleted", 0);
                    AuthorizeEffectDO authorizeEffectDO = authorizeEffectMapper.selectOne(queryWrapper);
                    if (authorizeEffectDO == null) {
                        //如果不存在就插入这条数据
                        AuthorizeEffectDO authorizeEffectDO1 = new AuthorizeEffectDO();
                        authorizeEffectDO1.setUserId(adminUserDO.getId());
                        //0代表部门授权
                        authorizeEffectDO1.setAuthorizeEffectType(0);
                        //0代表非临时授权
                        authorizeEffectDO1.setIsTemporary(0);
                        authorizeEffectMapper.insert(authorizeEffectDO1);
                        userIds.add(adminUserDO.getId());
                        //插入授权日志(生效表里没有这条数据则代表这人没有授权所以肯定会授权成功)
                        AuthorizeLogDO authorizeLogDO = new AuthorizeLogDO();
                        //授权id
                        authorizeLogDO.setAuthorizeId(deptAuthorize.getId());
                        //授权类型(0:部门 1:特殊 2:个人 3:临时)
                        authorizeLogDO.setAuthorizeType(0);
                        //用户id
                        authorizeLogDO.setUserId(adminUserDO.getId());
                        //备注
                        authorizeLogDO.setRemark("创建部门授权成功");
                        //是否成功(0:成功 1:失败)
                        authorizeLogDO.setIsSuccess(0);
                        //插入
                        authorizeLogMapper.insert(authorizeLogDO);
                    } else {
                        //如果不存在这条数据则把生效设置为1
                        //临时>人>特殊>部门
                        if (authorizeEffectDO.getAuthorizeEffectType() != 1 && authorizeEffectDO.getAuthorizeEffectType() != 2) {
                            //0代表部门授权
                            authorizeEffectDO.setAuthorizeEffectType(0);
                            authorizeEffectMapper.updateById(authorizeEffectDO);
                            userIds.add(adminUserDO.getId());
                            //插入授权日志(生效表里没有这条数据则代表这人没有授权所以肯定会授权成功)
                            AuthorizeLogDO authorizeLogDO = new AuthorizeLogDO();
                            //授权id
                            authorizeLogDO.setAuthorizeId(deptAuthorize.getId());
                            //授权类型(0:部门 1:特殊 2:个人 3:临时)
                            authorizeLogDO.setAuthorizeType(0);
                            //用户id
                            authorizeLogDO.setUserId(adminUserDO.getId());
                            //备注
                            authorizeLogDO.setRemark("创建部门授权成功");
                            //是否成功(0:成功 1:失败)
                            authorizeLogDO.setIsSuccess(0);
                            //插入
                            authorizeLogMapper.insert(authorizeLogDO);
                        } else {
                            //记录在授权日志里
                            //插入授权日志(失败)
                            AuthorizeLogDO authorizeLogDO = new AuthorizeLogDO();
                            //授权id
                            authorizeLogDO.setAuthorizeId(deptAuthorize.getId());
                            //授权类型(0:部门 1:特殊 2:个人 3:临时)
                            authorizeLogDO.setAuthorizeType(0);
                            //用户id
                            authorizeLogDO.setUserId(adminUserDO.getId());
                            //备注
                            authorizeLogDO.setRemark("创建部门授权失败,已有更高授权");
                            //是否成功(0:成功 1:失败)
                            authorizeLogDO.setIsSuccess(1);
                            //插入
                            authorizeLogMapper.insert(authorizeLogDO);
                        }
                    }
                }
                List<AdminUserRespDTO> adminUserDOS =adminUserApi.getUserListByDeptId(l);
                for (AdminUserRespDTO adminUserDO : adminUserDOS) {
                    //更新这个部门下面人的权限
                    //用户列表
                    User user = new User();
                    AdminUserRespDTO user1 = adminUserApi.getUser(adminUserDO.getId());
                    user.setI(user1.getUserSn());
                    user.setN(user1.getNickname());
                    user.setU(user1.getUserSn());
                    user.setC("");
                    user.setB("");
                    user.setW(user1.getPassword());
                    user.setD(user1.getDeptName());
                    //出门规则
                    if (createReqVO.getAuthorizeWay()==0) {
                        //门授权,判断这个门下的设备是进门还是出门
                        List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(createReqVO.getDoorId());
                        for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                            if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 0){
                                //存在并且为进门(0进门 1出门)
//                                Long inPeriodId = accessPeriodMapper.selectById(createReqVO.getInPeriodId()).getParentRuleId();
                                user.setM(createReqVO.getInPeriodId().toString());
                            }else if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 1){
                                //存在并且为出门(0进门 1出门)
//                                AccessPeriodDO accessPeriodDO = accessPeriodMapper.selectById(createReqVO.getOutPeriodId());
////                                Long outPeriodId = accessPeriodMapper.selectById(createReqVO.getOutPeriodId()).getParentRuleId();
                                user.setM(createReqVO.getOutPeriodId().toString());
                            }
                        }
                    }else if (createReqVO.getAuthorizeWay() == 1){
                        //获取这个门组下面的门
                        QueryWrapper queryWrapper = new QueryWrapper();
                        queryWrapper.eq("group_id",createReqVO.getDoorGroupId());
                        queryWrapper.eq("deleted",0);
                        List<GroupDoorRelateDO> doorRelateDOS = doorRelateMapper.selectList(queryWrapper);
                        for (GroupDoorRelateDO doorRelateDO : doorRelateDOS) {
                            List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(doorRelateDO.getDoorId());
                            for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                                if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 0){
                                    //存在并且为进门(0进门 1出门)
                                    user.setM(createReqVO.getOutPeriodId().toString());
                                }else if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 1){
                                    //存在并且为出门(0进门 1出门)
                                    user.setM(createReqVO.getOutPeriodId().toString());
                                }
                            }
                        }
                    }
                    //照片
                    FaceImageRespDTO faceImageDO = faceImageApi.getFaceImage(user1.getId());
                    if (faceImageDO != null) {
                        if (faceImageDO.getImage() != null && !faceImageDO.getImage().isEmpty()) {
                            // minio路径转换为文件路径
                            String objectName = faceImageDO.getImage().split("/")[faceImageDO.getImage().split("/").length - 1];
                            // 从 MinIO 下载文件并转换为字节数组
                            byte[] fileBytes = minioToBase64.downloadFileFromMinio(objectName);
                            // 转换为 Base64 字符串
                            String base6String4 = minioToBase64.convertToBase64(fileBytes);
                            user.setF(base6String4);
                        }
                    }
                    //放到user列表里传给设备
                    userList.add(user);
                }
        }
        //遍历已有部门授权列表,即修改现有的部门授权
        for (Long deptAuthorizeExit : deptAuthorizeExits) {
            QueryWrapper queryWrapperDeptAuthorize = new QueryWrapper();
            queryWrapperDeptAuthorize.eq("dept_id",deptAuthorizeExit);
            queryWrapperDeptAuthorize.eq("deleted",0);
            //修改部门授权
            DeptAuthorizeDO deptAuthorizeDO = deptAuthorizeMapper.selectOne(queryWrapperDeptAuthorize);
            //更新部门授权列表的基础性徐
            deptAuthorizeDO.setAuthorizeWay(createReqVO.getAuthorizeWay());
            deptAuthorizeDO.setDoorId(createReqVO.getDoorId());
            deptAuthorizeDO.setDoorGroupId(createReqVO.getDoorGroupId());
            deptAuthorizeDO.setEffectMethod(createReqVO.getEffectMethod());
            deptAuthorizeDO.setInPeriodId(createReqVO.getInPeriodId());
            deptAuthorizeDO.setOutPeriodId(createReqVO.getOutPeriodId());
            deptAuthorizeDO.setUpdateTime(LocalDateTime.now());
            deptAuthorizeMapper.updateById(deptAuthorizeDO);
            //查找这个部门下面有多少人
            List<AdminUserRespDTO> adminUserDOList = adminUserApi.getUserListByDeptId(deptAuthorizeExit);
            for (AdminUserRespDTO adminUserDO : adminUserDOList) {
                //遍历人员数组看看是否在授权生效表里有高于部门授权的授权,如果有则不授权
                QueryWrapper queryWrapper = new QueryWrapper();
                queryWrapper.eq("user_id", adminUserDO.getId());
                //目前生效类型为部门授权
                queryWrapper.eq("authorize_effect_type",0);
                queryWrapper.eq("deleted",0);
                AuthorizeEffectDO authorizeEffectDO = authorizeEffectMapper.selectOne(queryWrapper);
                if (authorizeEffectDO != null) {
                    //把符合规定的人插入到待更新的userId数组中
                    userUpdateIds.add(authorizeEffectDO.getUserId());
                    userUpdateList.add(adminUserApi.getUser(authorizeEffectDO.getUserId()));
                }
            }
            for (AdminUserRespDTO adminUserRespDTO : userUpdateList) {
                //更新这个部门下面人的权限
                //用户列表
                User user = new User();
                AdminUserRespDTO user1 = adminUserApi.getUser(adminUserRespDTO.getId());
                user.setI(user1.getUserSn());
                user.setN(user1.getNickname());
                user.setU(user1.getUserSn());
                user.setC("");
                user.setB("");
                user.setW(user1.getPassword());
                user.setD(user1.getDeptName());
                //出门规则
                if (createReqVO.getAuthorizeWay()==0) {
                    //门授权,判断这个门下的设备是进门还是出门
                    List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(createReqVO.getDoorId());
                    for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                        if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 0){
                            //存在并且为进门(0进门 1出门)
//                                Long inPeriodId = accessPeriodMapper.selectById(createReqVO.getInPeriodId()).getParentRuleId();
                            user.setM(createReqVO.getInPeriodId().toString());
                        }else if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 1){
                            //存在并且为出门(0进门 1出门)
//                                AccessPeriodDO accessPeriodDO = accessPeriodMapper.selectById(createReqVO.getOutPeriodId());
////                                Long outPeriodId = accessPeriodMapper.selectById(createReqVO.getOutPeriodId()).getParentRuleId();
                            user.setM(createReqVO.getOutPeriodId().toString());
                        }
                    }
                }else if (createReqVO.getAuthorizeWay() == 1){
                    //获取这个门组下面的门
                    QueryWrapper queryWrapper = new QueryWrapper();
                    queryWrapper.eq("group_id",createReqVO.getDoorGroupId());
                    queryWrapper.eq("deleted",0);
                    List<GroupDoorRelateDO> doorRelateDOS = doorRelateMapper.selectList(queryWrapper);
                    for (GroupDoorRelateDO doorRelateDO : doorRelateDOS) {
                        List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(doorRelateDO.getDoorId());
                        for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                            if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 0){
                                //存在并且为进门(0进门 1出门)
                                user.setM(createReqVO.getOutPeriodId().toString());
                            }else if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 1){
                                //存在并且为出门(0进门 1出门)
                                user.setM(createReqVO.getOutPeriodId().toString());
                            }
                        }
                    }
                }
                //照片
                FaceImageRespDTO faceImageDO = faceImageApi.getFaceImage(user1.getId());
                if (faceImageDO != null) {
                    if (faceImageDO.getImage() != null && !faceImageDO.getImage().isEmpty()) {
                        // minio路径转换为文件路径
                        String objectName = faceImageDO.getImage().split("/")[faceImageDO.getImage().split("/").length - 1];
                        // 从 MinIO 下载文件并转换为字节数组
                        byte[] fileBytes = minioToBase64.downloadFileFromMinio(objectName);
                        // 转换为 Base64 字符串
                        String base6String4 = minioToBase64.convertToBase64(fileBytes);
                        user.setF(base6String4);
                    }
                }
                //放到user列表里传给设备
                userList.add(user);
            }
        }
        //判断相关联的门找到要传输的设备
        if (createReqVO.getAuthorizeWay() == 0){
            //门授权
            //获得这个门下面的所有设备
            List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(createReqVO.getDoorId());
            for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                OnsiteEquipDO onsiteEquipDO = onsiteEquipMapper.selectById(doorDeviceRelateRespDTO.getDeviceId());
                //插入设备列表
                BaseDeviceDo baseDeviceDo = new BaseDeviceDo();
                baseDeviceDo.setDeviceType(onsiteEquipDO.getDeviceModel());
                baseDeviceDo.setIpAddr(onsiteEquipDO.getDeviceIp());
                baseDeviceDo.setPassword(onsiteEquipDO.getDevicePassword());
                baseDeviceDos.add(baseDeviceDo);
            }
        }else if (createReqVO.getAuthorizeWay() == 1){
            //门组授权
            //获得门组下的所有设备
            List<OnsiteEquipDO> onsiteEquipDOS = groupDoorRelateService.listByAccessGroupTableId(createReqVO.getDoorGroupId());
            onsiteEquipDOS.stream().forEach(onsiteEquipDO -> {
                BaseDeviceDo baseDeviceDo = new BaseDeviceDo();
                baseDeviceDo.setDeviceType(onsiteEquipDO.getDeviceModel());
                baseDeviceDo.setIpAddr(onsiteEquipDO.getDeviceIp());
                baseDeviceDo.setPassword(onsiteEquipDO.getDevicePassword());
                baseDeviceDos.add(baseDeviceDo);
            });
        }
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        // 分批处理用户列表
        int batchSize = 5;
        int totalTasks = (userList.size() + batchSize - 1) / batchSize;
        for (int i = 0; i < totalTasks; i++) {
            int start = i * batchSize;
            int end = Math.min(start + batchSize, userList.size());
            List<User> usersBatch = userList.subList(start, end);
            executorService.submit(() -> {
                control.setDeviceUser(baseDeviceDos, usersBatch);
            });
        }
        // 关闭线程池
        executorService.shutdown();
        try {
            // 等待所有任务完成
            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ids;
    }

image.gif

多线程方法:

// 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        // 分批处理用户列表
        int batchSize = 5;
        int totalTasks = (userList.size() + batchSize - 1) / batchSize;
        for (int i = 0; i < totalTasks; i++) {
            int start = i * batchSize;
            int end = Math.min(start + batchSize, userList.size());
            List<User> usersBatch = userList.subList(start, end);
            executorService.submit(() -> {
                control.setDeviceUser(baseDeviceDos, usersBatch);
            });
        }
        // 关闭线程池
        executorService.shutdown();
        try {
            // 等待所有任务完成
            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

image.gif

1. 创建线程池

ExecutorService executorService = Executors.newFixedThreadPool(3);

  • 使用 Executors.newFixedThreadPool(3) 创建了一个固定大小为 3 的线程池。
  • 线程池的作用是管理线程的生命周期,避免频繁创建和销毁线程带来的性能开销。
  • 在这个线程池中,最多可以同时运行 3 个线程。

每次运行三个线程可以解决掉请求头过长的问题。

2. 计算分批处理的批次数量

int batchSize = 5;

int totalTasks = (userList.size() + batchSize - 1) / batchSize;

  • batchSize 定义了每一批次处理的用户数量,这里设置为 5。
  • totalTasks 计算总共需要处理的批次数量。通过公式 (userList.size() + batchSize - 1) / batchSize,确保即使用户数量不能被 batchSize 整除,也能正确计算出需要的批次数量。

3. 分批处理用户列表

for (int i = 0; i < totalTasks; i++) {

   int start = i * batchSize;

   int end = Math.min(start + batchSize, userList.size());

   List<User> usersBatch = userList.subList(start, end);

   executorService.submit(() -> {

       control.setDeviceUser(baseDeviceDos, usersBatch);

   });

}

  • 循环逻辑
  • 外层循环 for (int i = 0; i < totalTasks; i++) 遍历所有批次。
  • 计算每一批次的范围
  • int start = i * batchSize; 计算当前批次的起始索引。
  • int end = Math.min(start + batchSize, userList.size()); 计算当前批次的结束索引,确保不会超出 userList 的范围。
  • 提取当前批次的用户子列表
  • List<User> usersBatch = userList.subList(start, end); 使用 subList 方法从 userList 中提取当前批次的用户子列表。
  • 提交任务到线程池
  • executorService.submit(() -> { control.setDeviceUser(baseDeviceDos, usersBatch); }); 将任务提交到线程池中执行。
  • 每个任务调用 control.setDeviceUser(baseDeviceDos, usersBatch) 方法,处理当前批次的用户。

4. 关闭线程池并等待所有任务完成

executorService.shutdown();

try {

   executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

} catch (InterruptedException e) {

   e.printStackTrace();

}

  • 关闭线程池
  • executorService.shutdown(); 调用 shutdown() 方法,表示不再接受新的任务,但会等待已经提交的任务完成。
  • 等待所有任务完成
  • executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 调用 awaitTermination 方法,等待线程池中的所有任务完成。
  • 这里使用 Long.MAX_VALUETimeUnit.NANOSECONDS,表示等待时间非常长,几乎等同于无限等待。
  • 异常处理
  • 如果线程被中断,会抛出 InterruptedException,捕获并打印堆栈信息。

这样就可以实现多线程向设备发送数据了。

另:附上设备发送数据的方法,也是多线程,可以参考。

/**
     * 设置设备用户(正式员工、常驻)
     */
    public List<ReturnMessage> setDeviceUser(List<BaseDeviceDo> baseDeviceDos, List<User> list) {
        List<ReturnMessage> failureMessages = new ArrayList<>();
        ExecutorService executor = Executors.newFixedThreadPool(baseDeviceDos.size());
        List<Runnable> tasks = new ArrayList<>();
        for (BaseDeviceDo baseDeviceDo : baseDeviceDos) {
            tasks.add(() -> {
                ReturnMessage returnMessage = null;
                try {
                    switch (baseDeviceDo.getDeviceType()) {
                        case M7:
                            returnMessage = m7Control.setDeviceUser(baseDeviceDo, list);
                    }
                    if (returnMessage != null && !returnMessage.getCode().equals("0")) {
                        failureMessages.add(returnMessage);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    // 将异常信息封装到 ReturnMessage 中,或者创建一个新的 ReturnMessage 来表示失败
                    returnMessage.setCode("50100");
                    returnMessage.setMsg("处理设备类型 " + baseDeviceDo.getDeviceType() + "设备IP为:" + baseDeviceDo.getIpAddr() + " 时发生异常: " + e.getMessage());
                    failureMessages.add(returnMessage);
                }
            });
        }
        tasks.forEach(executor::submit);
        executor.shutdown();
        while (!executor.isTerminated()) {
            // 等待所有线程完成
        }
        return failureMessages;
    }

image.gif


目录
相关文章
|
5月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
消息中间件 监控 安全
服务Down机了,线程池中的数据如何保证不丢失?
在分布式系统与高并发应用开发中,服务的稳定性和数据的持久性是两个至关重要的考量点。当服务遭遇Down机时,如何确保线程池中处理的数据不丢失,是每一位开发者都需要深入思考的问题。以下,我将从几个关键方面分享如何在这种情况下保障数据的安全与完整性。
303 2
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
324 62
|
10月前
|
SQL 数据建模 BI
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
9月前
|
数据采集 存储 安全
Python爬虫实战:利用短效代理IP爬取京东母婴纸尿裤数据,多线程池并行处理方案详解
本文分享了一套结合青果网络短效代理IP和多线程池技术的电商数据爬取方案,针对京东母婴纸尿裤类目商品信息进行高效采集。通过动态代理IP规避访问限制,利用多线程提升抓取效率,同时确保数据采集的安全性和合法性。方案详细介绍了爬虫开发步骤、网页结构分析及代码实现,适用于大规模电商数据采集场景。
|
10月前
|
缓存 安全 Java
面试中的难题:线程异步执行后如何共享数据?
本文通过一个面试故事,详细讲解了Java中线程内部开启异步操作后如何安全地共享数据。介绍了异步操作的基本概念及常见实现方式(如CompletableFuture、ExecutorService),并重点探讨了volatile关键字、CountDownLatch和CompletableFuture等工具在线程间数据共享中的应用,帮助读者理解线程安全和内存可见性问题。通过这些方法,可以有效解决多线程环境下的数据共享挑战,提升编程效率和代码健壮性。
360 6
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
354 57
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
1010 0
|
消息中间件 存储 Java
服务重启了,如何保证线程池中的数据不丢失?
【8月更文挑战第30天】为确保服务重启时线程池数据不丢失,可采用数据持久化(如数据库或文件存储)、使用可靠的任务队列(如消息队列或分布式任务队列系统)、状态监测与恢复机制,以及分布式锁等方式。这些方法能有效提高系统稳定性和可靠性,需根据具体需求选择合适方案并进行测试优化。
896 5

热门文章

最新文章