【操作幂等和数据一致性】保障业务在MySQL和COS对象存储的一致

news/2025/2/23 23:03:32

业务场景

  1. 发布信息,更新到数据库MySQL
  2. COS操作,更新JSON文件

不过可能存在幂等性和数据一致性的问题。

java">// 批量存MySQL
entityPublishService.saveOrUpdateBatch(entityPublishList);
// 遍历批量存COS对象存储
   searchEntitys.forEach(req -> {
                    //删除旧的json文件
                    this.cosFileClient.deleteFile(Constant.ResourcePath.RESOURCE_PATH + req.getId());
                    ObjectMapper mapper = new ObjectMapper();
                    String jsonString = null;
                    try {
                        jsonString = mapper.writeValueAsString(req);
                        // 将 JSON 字符串转换为 InputStream 并上传
                        InputStream inputStream = new ByteArrayInputStream(jsonString.getBytes());
                        this.cosFileClient.uploadFile(Constant.ResourcePath.RESOURCE_PATH + req.getId(), inputStream);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }); 

数据一致性与幂等

  • 幂等性指的是无论操作执行多少次,结果都一致。
    比如,如果这段代码被重复执行,应该不会导致数据错误或重复上传。
  • 数据一致性则要确保数据库和COS中的JSON文件状态一致,
    比如更新数据库后,必须成功上传新的JSON,否则应该回滚或处理失败情况。

业务潜在问题

当前的代码在保存或更新数据库后,遍历entitys,删除旧的JSON,然后生成新的并上传。这里有几个潜在的问题:

  1. 数据库操作和文件操作之间没有事务管理,如果上传文件失败,数据库已经提交了事务,导致数据不一致。

  2. 删除旧文件后,如果上传新文件失败,会导致数据丢失。

  3. 如果方法被重复调用,可能导致多次上传或删除,不幂等。

寻找解决方案

  • 可能的解决方案是,将数据库的保存操作拆分为每个实体的单独操作,并在每个实体处理时,先上传文件,再更新数据库。这样,每个实体的处理是原子性的:上传文件成功,然后更新数据库。如果上传失败,数据库不更新。但这样会影响性能,因为批量操作变为单个操作。

  • 为了数据一致性,可能需要在上传失败时,提供某种回滚机制,比如删除已更新的数据库记录。但这在批量操作中较为困难。
    因此可能需要业务层面的处理,比如标记记录为待定状态,或者记录操作日志,用于后续的补偿

  • 正确的顺序应该是先保存数据库,再上传文件,这样即使上传失败,数据库已经更新,但文件未更新,其他系统可能无法获取最新数据,但至少数据库是正确的,可以触发后续的修复机制

后续修复或补偿机制,比如在RabbitMQ消费者中业务,配置重试机制。人工操作记录人工补偿机制

为了确保操作的幂等性和数据一致性,尤其是批量操作,代码改进:

  1. 移除不必要的旧文件删除操作:直接通过覆盖上传实现更新,避免删除后上传失败导致的数据丢失。
  2. 添加上传重试机制:提高上传成功率,减少因临时问题导致的失败。
  3. 异常处理和事务分离:确保数据库操作后尽可能完成文件上传,若失败则通过异常通知上层逻辑处理。
java">// 批量存储MySQL
entityPublishService.saveOrUpdateBatch(entityPublishList);

// 遍历批量对象存储COS
entitys.forEach(req -> {
    ObjectMapper mapper = new ObjectMapper();
    String jsonString;
    InputStream inputStream = null;
    final int maxRetries = 3;
    int retryCount = 0;
    boolean uploaded = false;
    
    while (retryCount < maxRetries && !uploaded) {
        try {
            // 序列化JSON数据
            jsonString = mapper.writeValueAsString(req);
            inputStream = new ByteArrayInputStream(jsonString.getBytes(StandardCharsets.UTF_8));
            
            // 幂等上传(覆盖模式)
            this.cosFileClient.uploadFile(
                Constant.ResourcePath.RESOURCE_PATH + req.getId(), 
                inputStream
            );
            uploaded = true; // 标记上传成功
            
        } catch (JsonProcessingException e) {
            // JSON序列化失败,无需重试
            throw new RuntimeException("JSON序列化失败,ID: " + req.getId(), e);
        } catch (Exception e) {
            retryCount++;
            if (retryCount >= maxRetries) {
                throw new RuntimeException("文件上传失败,超过最大重试次数,ID: " + req.getId(), e);
            }
            // 指数退避等待
            try {
                Thread.sleep(1000 * retryCount);
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("上传操作被中断,ID: " + req.getId(), ie);
            }
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    // 日志记录关闭异常,但不影响主流程
                }
            }
        }
    }
    
    if (!uploaded) {
        // 此处可根据业务需要记录失败状态或触发补偿机制
        throw new RuntimeException("上传未完成,ID: " + req.getId());
    }
});

改进说明

  1. 幂等性保障

    • 覆盖上传:直接调用uploadFile覆盖同名文件,多次执行结果一致。
    • 数据库操作saveOrUpdateBatch本身应设计为幂等(如使用唯一约束或主键更新)。
  2. 数据一致性措施

    • 先持久化数据库:确保数据库更新后再同步文件,符合业务逻辑依赖。
    • 重试机制:通过最多3次重试(含退避等待),减少网络抖动等临时问题的影响。
    • 异常处理:上传失败时抛出异常,通知调用方处理(如日志记录、告警、人工干预或事务补偿)。
  3. 异常情况处理

    • 序列化错误:直接终止,无需重试(数据问题需修复)。
    • 上传失败:重试后仍失败则向上抛出,由系统统一处理(如标记数据状态、异步任务修复)。

注意事项

  • 若业务允许最终一致性,可考虑将上传失败记录至持久化队列,由后台任务异步重试。
  • 数据库设计可增加lastUpdateTime字段,确保重复提交时数据版本一致。
  • 监控上传失败异常,及时处理以保证系统健康度。

http://www.niftyadmin.cn/n/5863848.html

相关文章

国产编辑器EverEdit - 在编辑器中对文本进行排序

1 排序 1.1 应用场景 某些场景下用户需要对文本进行排序&#xff0c;比如&#xff1a;用户正在编辑函数列表&#xff0c;对函数列表按名称按字母A-Z排序。 1.2 使用方法 1.2.1 对选中文本进行排序 在编辑器中选中要排序的文本。选择主菜单工具 -> 排序 -> 升序排序 如…

Java数据结构_一篇文章搞定java对象的比较_7

1. PriorityQueue中插入对象 上篇文章研究了优先级队列&#xff0c;优先级队列在插入元素中&#xff0c;要求插入的元素不能是null或者元素之间必须要能够进行比较&#xff0c;为了简单起见&#xff0c;上篇文章只是插入了Integer类型&#xff0c;那优先级队列中是否能插入自定…

机器学习面试八股文——决战金三银四

大家好&#xff0c;这里是好评笔记&#xff0c;公主 号&#xff1a;Goodnote&#xff0c;专栏文章私信限时Free。本笔记的任务是解读机器学习实践/面试过程中可能会用到的知识点&#xff0c;内容通俗易懂&#xff0c;入门、实习和校招轻松搞定。 公主号合集地址 点击进入优惠地…

2025tg最新免费社工库机器人

情报局社工库 https://t.me/QingBaoJuXuanwubot?startNzExOTA0NzA2NA 小孩哥社工库 http://t.me/xiaohaigeSGK1_bot?startWGGVVrMgQiBslNE 冰墩墩个户机器人 t.me/bingdundung… 维基百科社工库 https://t.me/WikiSGKBot?start0b9d27c2e91b AI社工库 t.me/AI_SGKBOT?…

rpc到自己java实现rpc调用再到rpc框架设计

目录 rpc(Remote Procedure Call)rpc一般架构为什么要引入rpc自己实现rpc调用1. 新建一个maven项目&#xff0c;加入hessian依赖2. 服务端3. Stub代理4. 客户端测试输出5. rpc程序分析附 请求参数和序列化程序 6. 总结 回顾RPCRPC 序列化协议RPC 网络协议注册中心的引入dubbo框…

网络运维学习笔记 017 HCIA-Datacom综合实验01

文章目录 综合实验1实验需求总部特性 分支8分支9 配置一、 基本配置&#xff08;IP二层VLAN链路聚合&#xff09;ACC_SWSW-S1SW-S2SW-Ser1SW-CoreSW8SW9DHCPISPGW 二、 单臂路由GW 三、 vlanifSW8SW9 四、 OSPFSW8SW9GW 五、 DHCPDHCPGW 六、 NAT缺省路由GW 七、 HTTPGW 综合实…

使用 Grafana 监控 Spring Boot 应用

随着软件开发领域的不断发展&#xff0c;监控和可观测性已成为确保系统可靠性和性能的关键实践。Grafana 是一个功能强大的开源工具&#xff0c;能够为来自各种来源的监控数据提供丰富的可视化功能。在本篇博客中&#xff0c;我们将探讨如何将 Grafana 与 Spring Boot 应用程序…

LeetCode 热题 100 73. 矩阵置零

LeetCode 热题 100 | 73. 矩阵置零 大家好&#xff0c;今天我们来解决一道经典的算法题——矩阵置零。这道题在LeetCode上被标记为中等难度&#xff0c;要求我们将矩阵中为0的元素所在的行和列全部置为0。下面我将分别给出非原地算法和原地算法的Python代码实现&#xff0c;并…