site logo

Marico's space

我在 Java 中重建了三层缓存系统 — Redis、L1 和 MongoDB 的正确实现

服务器技术 2026-04-26 11:50:45 11
我一直在做 **Nexus**,这是一个后端基础设施项目,最近数据同步层需要认真重新设计。原本看似正常工作的缓存系统,实际上存在层次结构混乱、静默数据丢失、竞态条件和潜在死锁等问题。 这篇文章记录了原始代码中的每个问题以及我具体的修复方法。全程包含代码对比。 --- ## 架构:我们正在构建什么 系统管理三层数据: ``` ┌─────────────────────────────────┐ │ Redis Cache (MASTER) │ ← single source of truth └────────────┬──────────┬─────────┘ │ │ pull 10s flush 15s │ │ ┌────────────▼──┐ ┌────▼──────────────┐ │ L1 Cache │ │ MongoDB │ │ (in-memory) │ │ (persistent DB) │ └───────────────┘ └────────────────────┘ ``` 规则很简单: - **Redis 永远是主库。** 其他层不能覆盖它。 - **L1** 是 Redis 的内存镜像。它从 Redis 读取,永远不会反向操作。 - **MongoDB** 是持久化备份。Redis 写入它——而不是反过来。 三个定时任务保持同步: | 任务 | 间隔 | 职责 | |------|------|------| | L1 同步 | 10s | 从 Redis 拉取 → 有变化则更新 L1 | | 自动刷新 | 15s | 将脏键从 Redis 推送到 MongoDB | | 数据对账 | 3 min | 比较 Redis ↔ MongoDB,Redis 获胜 | 现在让我们看看原始实现到底哪里出了问题。 --- ## 问题 1:层次结构是反的 这是最根本的问题。对账任务本应强制 Redis 为主——但实际上在做相反的事。 ### 原始代码 ```java private void startReconciliationTask() { redisManager.processTask(() -> { idToDataList.forEach((keyTag, model) -> { if (dirtyKeys.contains(keyTag)) return; NexusApplication.getApplication() .getMongoManager() .getValue(model.getAddon(), model.getSpecificDbKey()) .thenAccept(dbJson -> { if (dbJson == null) return; try { String cleanDbJson = model.getAddon().modelInitComp(dbJson); if (!cleanDbJson.equals(model.getValueJson())) { model.setValueJson(cleanDbJson); // ← 用 Mongo 覆盖 L1 redisManager.setData(keyTag, cleanDbJson); // ← 用 Mongo 覆盖 Redis! } } catch (Exception e) { e.printStackTrace(); } }); }); }); } ``` 当 MongoDB 的值与 Redis 不同时,它把 MongoDB 的值**写入 Redis**。MongoDB 实际上扮演了主库的角色。整个优先级链被颠倒了。 ### 修复方法 ```java private void startReconciliationTask() { RedisManager rm = NexusApplication.getApplication().getRedisManager(); rm.processTask(() -> { // ... // 首先从 Redis 获取主值 String redisJson = rm.getData(key).orElseGet(model::getValueJson); NexusApplication.getApplication() .getMongoManager() .getValue(model.getAddon(), model.getSpecificDbKey()) .thenAccept(dbJson -> { // ... String cleanDbJson = model.getAddon().modelInitComp(dbJson); // Redis 获胜。如果 Mongo 不同,则更新 Mongo——而不是 Redis。 if (!cleanDbJson.equals(redisJson)) { NexusApplication.getApplication().getMongoManager() .setValue(model.getAddon(), model.getSpecificDbKey(), redisJson); model.setValueJson(redisJson); // L1 跟随 Redis } }); }); } ``` Redis 被首先获取并作为事实标准。当存在不匹配时,MongoDB 被修正——而不是 Redis。 --- ## 问题 2:刷新失败时静默丢失数据 自动刷新任务在确认 MongoDB 写入成功**之前**就移除了脏标记。 ### 原始代码 ```java private void startAutoSyncTask() { List keysToSync = new ArrayList<>(dirtyKeys); dirtyKeys.removeAll(keysToSync); // ← 在写入 Mongo 之前就移除了 redisManager.processTask(() -> { for (String key : keysToSync) { DataModel model = idToDataList.get(key); if (model != null) { NexusApplication.getApplication().getMongoManager() .setValue(model.getAddon(), model.getSpecificDbKey(), model.getValueJson()); // 如果这里失败,键已经从 dirtyKeys 中移除 // 它永远不会被重试 } } }); } ``` 如果 MongoDB 写入抛出异常或 future 以异常完成,脏标记已经被移除。该条目永远不会被重试。数据静默丢失了。 ### 修复方法 ```java private void startAutoFlushTask() { List keysToFlush = new ArrayList<>(dirtyKeys); // snapshot — 不要立即 removeAll rm.processTask(() -> { for (String key : keysToFlush) { DataModel model = keyToModel.get(key); if (model == null) { dirtyKeys.remove(key); continue; } String jsonToWrite = rm.getData(key).orElseGet(model::getValueJson); try { NexusApplication.getApplication().getMongoManager() .setValue(model.getAddon(), model.getSpecificDbKey(), jsonToWrite) .get(); // 阻塞直到 Mongo 确认写入 dirtyKeys.remove(key); // 只在确认成功后才移除 } catch (Exception e) { // 保持键为脏——下一次刷新时会重试 LOGGER.log(Level.SEVERE, "[AutoFlush] Write failed, will retry: " + key, e); } } }); } ``` 这里有两个改变:脏标记快照被获取但**不会立即清除**,`dirtyKeys.remove(key)` 只在 `.get()` 确认写入成功后才执行。如果出问题,键保持脏状态,15 秒后重试。 --- ## 问题 3:`removeModel()` 中的 TOCTOU 竞态条件 TOCTOU 代表 **Time-of-Check to Time-of-Use**(检查时与使用时的时间差)。原始的 `removeModel()` 在移除前检查键是否存在——但另一个线程可能在这两个操作之间删除了它。 ### 原始代码 ```java public void removeModel(String key) { if (idToDataList.containsKey(key)) { // 线程 A 检查:键存在 // --- 线程 B 在这里删除了键 --- dirtyKeys.remove(key); idToDataList.remove(key); // 线程 A 移除:但键已经不存在了 NexusApplication.getApplication().getRedisManager().deleteData(key); } } ``` 在并发环境中,这是不安全的。`containsKey` 检查和 `remove` 调用是两个单独的操作,它们之间没有原子性保证。 ### 修复方法 ```java public void removeModel(String key) { DataModel removed = keyToModel.remove(key); // 原子操作:检查 + 移除一步完成 if (removed == null) return; // 不存在——无需操作 idToKey.remove(removed.getId()); dirtyKeys.remove(key); NexusApplication.getApplication().getRedisManager().deleteData(key); } ``` `ConcurrentHashMap.remove()` 是原子操作。它的返回值告诉你是否实际移除了任何东西。一次操作,无竞态。 --- ## 问题 4:对账中的死锁风险 原始的对账任务在已运行的 `processTask` **内部**分派了新的 `processTask`。 ### 原始代码 ```java redisManager.processTask(() -> { idToDataList.forEach((keyTag, model) -> { // ... NexusApplication.getApplication().getMongoManager().getValue(...) .thenAccept(dbJson -> { // ... redisManager.processTask(() -> // ← 从运行中的任务内部分派新任务 NexusApplication.getApplication().getMongoManager() .setValue(...)); }); }); }); ``` 如果 `processTask` 使用单线程执行器(这对于 Redis 客户端确保命令顺序很常见),从运行中的任务内部提交新任务意味着内部任务永远无法启动——外部任务阻塞了唯一可用的线程。这就是死锁。 ### 修复方法 所有操作都在一个 `processTask` 上下文中运行。`thenAccept` 内部的 MongoDB 写入是普通的异步调用——没有嵌套的 `processTask`。 ```java rm.processTask(() -> { for (String key : keys) { // ... CompletableFuture future = NexusApplication.getApplication() .getMongoManager() .getValue(...) .thenAccept(dbJson -> { // ... // 直接写入 Mongo——无嵌套 processTask NexusApplication.getApplication().getMongoManager() .setValue(...); }); batch.add(future); if (batch.size() >= RECONCILE_BATCH_SIZE) { waitForBatch(batch); batch.clear(); } } }); ``` 所有对账工作都在一个任务中完成。没有任务分派另一个任务。 --- ## 问题 5:大型数据集上的 MongoDB 请求风暴 原始的对账同时触发每个条目的 MongoDB 查询——没有节流,没有批处理。 在 1000+ 条目的情况下,这意味着 1000+ 个并发 MongoDB 读取,紧接着可能是 1000+ 个写入。这会耗尽连接池、导致延迟飙升,并在负载下引发级联故障。 ### 修复方法:批处理 ```java private static final int RECONCILE_BATCH_SIZE = 50; // 在 startReconciliationTask 内部: List> batch = new ArrayList<>(RECONCILE_BATCH_SIZE); for (String key : keys) { if (dirtyKeys.contains(key)) continue; // ... CompletableFuture future = mongoManager.getValue(...).thenAccept(...); batch.add(future); if (batch.size() >= RECONCILE_BATCH_SIZE) { waitForBatch(batch); // 等待所有 50 个完成 batch.clear(); // 然后开始下一个 50 } } if (!batch.isEmpty()) waitForBatch(batch); ``` ```java private void waitForBatch(List> batch) { try { CompletableFuture.allOf(batch.toArray(new CompletableFuture[0])).get(); } catch (Exception e) { LOGGER.log(Level.WARNING, "[Reconciliation] Batch wait error", e); } } ``` 任何时候最多有 50 个并发 MongoDB 请求。下一批只在前一批完成后才开始。这很容易调整——如果你的 MongoDB 能处理更多并发,可以增大 `RECONCILE_BATCH_SIZE`;在受限环境中可以降低。 --- ## 问题 6:Redis 淘汰/TTL 被静默忽略 Redis 可以在内存压力下或 TTL 到期时淘汰键。原始的 L1 同步在检测到键缺失时什么都不做。 ### 原始代码 ```java redisManager.getData(key).ifPresent(redisJson -> { if (!redisJson.equals(model.getValueJson())) { model.setValueJson(redisJson); } // 如果 getData() 返回空——我们静默跳过 // L1 现在与被淘汰的 Redis 键不同步 }); ``` 当键被从 Redis 淘汰时,L1 无限期地保留其过期值。下一次刷新会尝试从 Redis 读取,得到空值,然后回退到 L1 的过期数据并写入 MongoDB——可能丢失被淘汰前 Redis 中的更新值。 ### 修复方法 ```java Optional redisOpt = rm.getData(key); if (redisOpt.isPresent()) { String redisJson = redisOpt.get(); if (!redisJson.equals(model.getValueJson())) { model.setValueJson(redisJson); // L1 跟随 Redis } } else { // 键已从 Redis 淘汰——从 L1 恢复并重新标记为脏 LOGGER.warning("[L1Sync] Redis key missing, restoring: " + key); rm.setData(key, model.getValueJson()); // 从 L1 恢复 Redis dirtyKeys.add(key); // 触发 Mongo 重新写入 } ``` 当 Redis 没有键时,L1 的值被推回 Redis(恢复主库),键被标记为脏,以便刷新任务重新持久化到 MongoDB。 --- ## 问题 7:`addModelFix()` 导致 Redis 为空 这个方法本应处理外部提供的数据,但它只写入 L1——让 Redis 没有该键。 ### 原始代码 ```java public void addModelFix(String key, DataModel model) { idToDataList.put(key, model); // 只写入 L1 dirtyKeys.add(key); // Redis 没有这个键的条目 // 下一次 L1 同步会检测到 Redis 键缺失,从 L1 恢复它, // 并重新标记为脏——不必要的额外周期 } ``` ### 修复方法 ```java public void addModelFix(String key, DataModel model) { writeToL1AndRedis(key, model); // 同时写入 L1 和 Redis dirtyKeys.add(key); } ``` 所有写路径现在都通过相同的内部方法,确保两层始终一起更新。 --- ## 问题 8:O(n) 的 ID 查询 `getDataModelFromId()` 方法每次调用都遍历整个 map。 ### 原始代码 ```java public Optional getDataModelFromId(String id) { return idToDataList.values().stream() .filter(dm -> dm.getId().equals(id)) .findAny(); // O(n) — 扫描整个 map } ``` 在 1000 个条目时每次查询需要 1000 次比较。如果这个方法被频繁调用(比如游戏服务器中每个玩家操作都会调用),性能会迅速恶化。 ### 修复方法:反向索引 第二个 `ConcurrentHashMap` 维护 `id → key` 映射,与主 map 同步更新。 ```java // 新字段 private final ConcurrentHashMap idToKey; // 每次写入时更新 private void writeToL1AndRedis(String key, DataModel model) { keyToModel.put(key, model); idToKey.put(model.getId(), key); // 维护反向索引 NexusApplication.getApplication().getRedisManager().setData(key, model.getValueJson()); } // 移除时也要更新 public void removeModel(String key) { DataModel removed = keyToModel.remove(key); if (removed == null) return; idToKey.remove(removed.getId()); // 保持反向索引清洁 // ... } // 查询现在是 O(1) public Optional getDataModelFromId(String id) { String key = idToKey.get(id); if (key == null) return Optional.empty(); return Optional.ofNullable(keyToModel.get(key)); } ``` 两次 map 查询而不是全表扫描。内存成本很小——只是一个字符串的第二个 map。 --- ## 最终总结 以下是所做的每个更改的总结: | # | 问题 | 影响 | 修复 | |---|------|------|------| | 1 | 层次反转——Mongo 覆盖 Redis | 数据错误 | Redis 优先获取;Mongo 更新以匹配 Redis | | 2 | Mongo 写入确认前移除脏标记 | 静默数据丢失 | `dirtyKeys.remove()` 只在 `.get()` 成功后调用 | | 3 | `removeModel()` 中的 TOCTOU 竞态 | 潜在 NPE / 双重删除 | 使用返回值的单次原子 `remove()` | | 4 | 对账中嵌套 `processTask` | 单线程执行器死锁 | 所有工作在单一任务上下文中,无内部分派 | | 5 | 对账中所有条目同时查询 | MongoDB 连接风暴 | 批处理(每次 50 个),使用 `CompletableFuture.allOf()` | | 6 | L1 同步中静默忽略 Redis 淘汰/TTL | 过期的 L1 数据,错误的 Mongo 写入 | 从 L1 恢复 Redis,标记为脏 | | 7 | `addModelFix()` 跳过 Redis | Redis 缺少键,额外同步周期 | 统一 `writeToL1AndRedis()` 用于所有写路径 | | 8 | `getDataModelFromId()` 是 O(n) | 1000+ 条目时 CPU 压力 | `id → key` 反向索引实现 O(1) 查询 | --- ## 总结 原始代码没有明显损坏——它运行、同步、大部分情况下工作正常。问题出在边缘情况:当 Mongo 一轮不可用时,当 Redis 在压力下淘汰键时,当两个线程同时访问 `removeModel()` 时。 分布式缓存管理是细节真正重要的地方。清晰的层次结构和明确的失败保证不是可选的附加功能——它们是区分"能工作"和"几乎总是能工作"的系统的关键。 完整源代码可在 v1.1.0 发布版 中找到。 --- *如果你发现我遗漏了什么,或者对这些问题的处理方法有不同见解,我很乐意在评论中听到。*