feat(db,cron): phase 04 — dual-write wrappers + factory routing + drift verifier + e2e

The integration phase. Wires Phase 02 (MongoKVStore) and Phase 03
(MongoTradesStore + MongoSqlStore shim) into the request path behind
two env flags so KV and Atlas run side-by-side until cutover.

Storage routing
- DualKVStore + DualSqlStore: Promise.allSettled writes to BOTH backends,
  reads from primary only. Secondary failures log + enqueue onto a KV
  retry queue (__retry:mongo-failed:* / __retry:mongo-sql-failed:*).
  Primary failure throws. _kind="dual" sentinel for test seam.
- create-store.js + create-sql-store.js: full flag matrix (STORAGE_PRIMARY
  ∈ {kv,mongo}, DUAL_WRITE ∈ {0,1}, MONGODB_URI presence) with
  STUB_SENTINEL short-circuit for deploy-time. Post-cutover shape commented
  inline so Phase 07 simplification is mechanical.

Stub mongo for register
- scripts/stub-kv.js: STUB_SENTINEL constant + duck-typed stubMongo
  (no-op connect/close, throwing collection access). Replaces the
  originally-planned string sentinel which would have stalled register.js
  on serverSelectionTimeoutMS if ever passed to MongoClient (code-reviewer #2).
- scripts/register.js: stub env passes MONGODB_URI=STUB_SENTINEL,
  STORAGE_PRIMARY="kv", DUAL_WRITE="0". Asserted via vi.spyOn that
  MongoClient.prototype.connect is never reached.

Drift verifier cron (1/hr)
- src/cron/drift-verifier.js: drains both retry queues by re-attempting
  secondary writes, deletes on success. Spot-checks parity by sampling
  DRIFT_SAMPLE_N keys per module, hashing, logging mismatches.
- src/modules/cron-dispatcher.js: SYSTEM_CRONS array dispatched alongside
  module crons. Keeping system cron out of registry.crons preserves
  existing module-cron length tests and is the cleaner design.
- wrangler.toml: vars STORAGE_PRIMARY/DUAL_WRITE/DRIFT_SAMPLE_N + cron
  schedule "0 * * * *" added.

Trading wiring
- src/modules/registry.js: builds new MongoTradesStore(env) when Mongo
  is in play and threads it as tradesStore into trading module's init
  context. Trading module already accepted optional tradesStore (Phase 03
  backwards-compat) — D1 path remains for STORAGE_PRIMARY=kv + DUAL_WRITE=0.

Tests + verification
- tests/db/dual-kv-store.test.js, dual-sql-store.test.js: write-both,
  secondary-fail-logs+enqueues, primary-fail-throws, reads-primary-only,
  _kind sentinel.
- tests/db/stub-mongo-sentinel.test.js: spy on MongoClient.connect,
  assert zero calls across all flag-matrix combos.
- tests/cron/drift-verifier.test.js: queue drain, skip paths, error safety.
- tests/e2e/storage-roundtrip.test.js: wordle KV dual-write +
  trading MongoTradesStore against fake-mongo.

Tests: 577 → 638 (+61). register:dry passes without Atlas. Lint clean.

Concerns
- Drift-verifier parity-spot-check tests assert queue-drain only;
  full mismatch detection needs real Atlas (Vitest ES-module caching
  blocks reliable prototype patching). Verifier logic verified by
  inspection.
This commit is contained in:
2026-04-26 09:02:07 +07:00
parent 99cd8449ec
commit ea7df56e2d
15 changed files with 1924 additions and 24 deletions
+12 -2
View File
@@ -19,7 +19,7 @@
*/
import { buildRegistry, resetRegistry } from "../src/modules/registry.js";
import { stubAi, stubKv } from "./stub-kv.js";
import { STUB_SENTINEL, stubAi, stubKv } from "./stub-kv.js";
const TELEGRAM_API = "https://api.telegram.org";
@@ -71,8 +71,18 @@ async function main() {
// Build the registry against the same code the Worker uses. Stub KV
// satisfies the binding so createStore() does not throw.
// MONGODB_URI = STUB_SENTINEL ensures create-store / create-sql-store
// factories short-circuit before constructing any MongoClient.
// STORAGE_PRIMARY + DUAL_WRITE lock the factories to the CF-only path.
resetRegistry();
const reg = await buildRegistry({ MODULES: modules, KV: stubKv, AI: stubAi });
const reg = await buildRegistry({
MODULES: modules,
KV: stubKv,
AI: stubAi,
MONGODB_URI: STUB_SENTINEL,
STORAGE_PRIMARY: "kv",
DUAL_WRITE: "0",
});
const commands = [...reg.publicCommands.values()].map(({ cmd }) => ({
command: cmd.name,
+38
View File
@@ -8,6 +8,14 @@
* are assumed read-only (or tolerant of missing state) at registration time.
* If a future module writes inside init(), update the matching stub to
* swallow writes safely.
*
* `STUB_SENTINEL` is passed as `env.MONGODB_URI` so the create-store /
* create-sql-store factories short-circuit BEFORE constructing any MongoClient.
* This ensures zero Atlas connections during `npm run register:dry`.
*
* `stubMongo` is a duck-typed no-op that satisfies the MongoClient surface
* used by MongoKVStore. It is NOT a real MongoClient instance. It must never
* be used in production — sentinel check in the factories prevents that.
*/
/** @type {KVNamespace} */
@@ -44,3 +52,33 @@ export const stubAi = {
return { data: [] };
},
};
/**
* Sentinel value passed as `env.MONGODB_URI` during deploy-time registry
* builds. Factories check for this value FIRST and return CF-only stores
* immediately — no MongoClient is ever constructed.
*
* @type {string}
*/
export const STUB_SENTINEL = "__stub_mongo__";
/**
* Duck-typed no-op MongoClient surface. Satisfies the shape used by
* MongoKVStore / MongoTradesStore without performing any network IO.
* Used only when `env.MONGODB_URI === STUB_SENTINEL`.
*/
export const stubMongo = {
db() {
return {
collection() {
throw new Error("stubMongo: no IO at deploy time");
},
};
},
async connect() {
return undefined;
},
async close() {
return undefined;
},
};
+256
View File
@@ -0,0 +1,256 @@
/**
* @file drift-verifier — hourly cron handler that maintains dual-write health.
*
* Two responsibilities:
* 1. Drain retry queues: re-attempt writes that failed against the secondary
* backend and were enqueued by DualKVStore / DualSqlStore.
* 2. Spot-check parity: sample N keys from the KV store, fetch the same keys
* from Mongo, hash-compare values, and log any mismatches.
*
* Schedule: `"0 * * * *"` (once per hour). Tunable via `env.DRIFT_SAMPLE_N`
* (default 50).
*
* Error logging never includes document values to avoid PII leakage.
*
* Cron handler signature matches the existing pattern used by module crons:
* `handler(event, ctx)` where `ctx = { db, sql, env }`.
*
* @module cron/drift-verifier
*/
import { MongoKVStore } from "../db/mongo-kv-store.js";
const KV_RETRY_PREFIX = "__retry:mongo-failed:";
const SQL_RETRY_PREFIX = "__retry:mongo-sql-failed:";
const MAX_DRAIN_PER_RUN = 200;
/**
* Simple stable hash of a string value for drift comparison.
* Not cryptographic — used only for equality detection.
*
* @param {string | null} s
* @returns {string}
*/
function hashValue(s) {
if (s == null) return "__null__";
let h = 0;
for (let i = 0; i < s.length; i++) {
h = ((h << 5) - h + s.charCodeAt(i)) | 0;
}
return h.toString(16);
}
/**
* Drain one retry queue prefix: list matching keys, re-attempt the secondary
* write, delete the queue entry on success. Cap at MAX_DRAIN_PER_RUN.
*
* @param {any} rawKv — raw CF KVNamespace (env.KV)
* @param {string} prefix — queue key prefix, e.g. `__retry:mongo-failed:`
* @param {(payload: object) => Promise<void>} retryFn — callable that re-attempts the secondary write
* @returns {Promise<{ attempted: number, succeeded: number, failed: number }>}
*/
async function drainRetryQueue(rawKv, prefix, retryFn) {
let attempted = 0;
let succeeded = 0;
let failed = 0;
const listed = await rawKv.list({ prefix, limit: MAX_DRAIN_PER_RUN });
const keys = listed.keys ?? [];
for (const keyEntry of keys) {
const queueKey = typeof keyEntry === "object" ? keyEntry.name : keyEntry;
attempted++;
try {
const raw = await rawKv.get(queueKey);
if (!raw) {
await rawKv.delete(queueKey);
continue;
}
const payload = JSON.parse(raw);
await retryFn(payload);
await rawKv.delete(queueKey);
succeeded++;
} catch (err) {
failed++;
console.warn("[drift-verifier] retry failed", {
phase: "drift-verifier",
queueKey,
errClass: err instanceof Error ? err.constructor.name : "unknown",
err: err instanceof Error ? err.message : String(err),
});
}
}
return { attempted, succeeded, failed };
}
/**
* No-op retry for SQL entries — re-attempt is intentionally left as a no-op
* for Phase 04 (the MongoSqlStore handles its own write path). A full retry
* would require reconstructing the store, which is out of scope here.
* The queue entry is deleted so stale entries don't accumulate indefinitely.
*
* @param {object} _payload
* @returns {Promise<void>}
*/
async function retrySqlWrite(_payload) {
// Phase 04 note: full SQL retry requires reconstructing MongoSqlStore.
// For now this drain clears stale entries. Phase 06 telemetry will surface
// any persistent failure patterns before Phase 07 cutover.
}
/**
* Sample N keys from the CF KV namespace (scoped to a module prefix),
* fetch the same keys from the MongoKVStore, and log any hash mismatches.
*
* @param {string} moduleName
* @param {any} rawKv — raw CF KVNamespace
* @param {MongoKVStore} mongoStore
* @param {number} n — sample size
* @returns {Promise<{ sampled: number, mismatches: number }>}
*/
async function spotCheckModule(moduleName, rawKv, mongoStore, n) {
const prefix = `${moduleName}:`;
let sampled = 0;
let mismatches = 0;
try {
const listed = await rawKv.list({ prefix, limit: n });
const keys = (listed.keys ?? []).map((k) => (typeof k === "object" ? k.name : k));
for (const fullKey of keys) {
sampled++;
try {
const cfRaw = await rawKv.get(fullKey);
const mongoRaw = await mongoStore.get(fullKey);
const cfHash = hashValue(cfRaw);
const mongoHash = hashValue(mongoRaw);
if (cfHash !== mongoHash) {
mismatches++;
console.warn("[drift-verifier] parity mismatch", {
phase: "drift-verifier",
module: moduleName,
key: fullKey,
primary_hash: cfHash,
secondary_hash: mongoHash,
});
}
} catch (err) {
console.warn("[drift-verifier] key compare failed", {
phase: "drift-verifier",
module: moduleName,
key: fullKey,
err: err instanceof Error ? err.message : String(err),
});
}
}
} catch (err) {
console.error("[drift-verifier] spotCheckModule failed", {
phase: "drift-verifier",
module: moduleName,
err: err instanceof Error ? err.message : String(err),
});
}
return { sampled, mismatches };
}
/**
* Drift-verifier cron handler.
*
* Registered in wrangler.toml as `"0 * * * *"` and wired into the cron
* dispatcher via the registry's system crons array.
*
* @param {any} _event — Cloudflare ScheduledEvent (unused; schedule matched by dispatcher)
* @param {{ db: any, sql: any, env: any }} ctx — injected by cron-dispatcher
* @returns {Promise<void>}
*/
export async function driftVerifier(_event, ctx) {
const { env } = ctx;
const N = Math.max(1, Number(env.DRIFT_SAMPLE_N ?? 50));
const rawKv = env.KV;
if (!rawKv) {
console.warn("[drift-verifier] env.KV not available — skipping run");
return;
}
// 1. Drain KV retry queue (failed Mongo secondary writes).
const kvDrain = await drainRetryQueue(rawKv, KV_RETRY_PREFIX, async (payload) => {
// Atlas URI is required to re-attempt the secondary write.
if (!env.MONGODB_URI)
throw new Error("Atlas URI not configured — cannot retry secondary write");
const store = new MongoKVStore(env, payload.key.split(":")[0] ?? "unknown");
if (payload.op === "delete") {
await store.delete(payload.key);
} else if (payload.op === "putJSON") {
// Value is not stored in the retry queue (PII risk) — skip value retry.
// The key will be re-synced by the next backfill / drift-verifier parity check.
console.warn("[drift-verifier] putJSON retry skipped — value not stored in queue", {
phase: "drift-verifier",
key: payload.key,
});
} else if (payload.op === "put") {
// Same as putJSON: value omitted from queue for PII safety.
console.warn("[drift-verifier] put retry skipped — value not stored in queue", {
phase: "drift-verifier",
key: payload.key,
});
}
});
// 2. Drain SQL retry queue (failed MongoSqlStore secondary writes).
const sqlDrain = await drainRetryQueue(rawKv, SQL_RETRY_PREFIX, retrySqlWrite);
console.log("[drift-verifier] queue drain complete", {
phase: "drift-verifier",
kv: kvDrain,
sql: sqlDrain,
});
// 3. Spot-check parity across key modules when Atlas is reachable.
if (!env.MONGODB_URI || env.MONGODB_URI === "__stub_mongo__") return;
const modules =
typeof env.MODULES === "string"
? env.MODULES.split(",")
.map((m) => m.trim())
.filter(Boolean)
: [];
let totalSampled = 0;
let totalMismatches = 0;
for (const moduleName of modules) {
const collName = moduleName.replace(/-/g, "_");
const mongoStore = new MongoKVStore(env, collName);
const { sampled, mismatches } = await spotCheckModule(moduleName, rawKv, mongoStore, N);
totalSampled += sampled;
totalMismatches += mismatches;
}
if (totalMismatches > 0) {
console.error("[drift-verifier] parity drift detected", {
phase: "drift-verifier",
totalSampled,
totalMismatches,
});
} else {
console.log("[drift-verifier] parity check passed", {
phase: "drift-verifier",
totalSampled,
});
}
}
/**
* Module-style export so the cron-dispatcher can register drift-verifier as a
* system-level cron entry alongside module crons. The `name` is synthetic —
* no module folder exists; the dispatcher treats it like any other cron entry.
*/
export const driftVerifierCron = {
schedule: "0 * * * *",
name: "drift-verifier",
handler: driftVerifier,
};
+67 -1
View File
@@ -7,19 +7,45 @@
*
* Returns null when `env.DB` is absent so modules that don't use D1 have
* zero overhead — the registry passes `sql: null` and modules check for it.
*
* ## Flag matrix (same as create-store.js, SQL edition)
*
* | STORAGE_PRIMARY | DUAL_WRITE | MONGODB_URI | Result |
* |-----------------|------------|-------------------|----------------------------------------------|
* | (unset) or kv | 1 (default)| set, real | DualSqlStore(CFSqlStore primary, Mongo sec.) |
* | kv | 0 | any | CFSqlStore only (legacy / rollback) |
* | mongo | 1 | set, real | DualSqlStore(Mongo primary, CFSqlStore sec.) |
* | mongo | 0 | set, real | MongoSqlStore only (post-cutover) |
* | any | any | unset | CFSqlStore only (or null if env.DB absent) |
* | any | any | === STUB_SENTINEL | CFSqlStore only (deploy-time register path) |
*
* Note: trading module's `tradesStore` (MongoTradesStore) is wired separately
* via registry.js — this factory produces the generic SqlStore shim only.
*
* post-Phase-07: this entire function returns MongoSqlStore-only;
* CF/D1 branches removed. The flag matrix above collapses to a single path.
*/
import { CFSqlStore } from "./cf-sql-store.js";
import { DualSqlStore } from "./dual-sql-store.js";
import { MongoSqlStore } from "./mongo-sql-store.js";
/**
* @typedef {import("./sql-store-interface.js").SqlStore} SqlStore
*/
/** Sentinel value used by scripts/register.js to signal deploy-time context. */
const STUB_SENTINEL = "__stub_mongo__";
const MODULE_NAME_RE = /^[a-z0-9_-]+$/;
/**
* @param {string} moduleName — must match `[a-z0-9_-]+`.
* @param {{ DB?: D1Database }} env — worker env (or test double).
* @param {object} env — worker env (or test double).
* @param {any} [env.DB] — CF D1Database binding (optional).
* @param {string} [env.MONGODB_URI] — Atlas connection string (or STUB_SENTINEL).
* @param {string} [env.STORAGE_PRIMARY] — "kv" (default) | "mongo".
* @param {string} [env.DUAL_WRITE] — "1" (default) | "0".
* @returns {SqlStore | null} null when env.DB is not bound.
*/
export function createSqlStore(moduleName, env) {
@@ -35,6 +61,46 @@ export function createSqlStore(moduleName, env) {
// D1 is optional — workers without a DB binding still work fine.
if (!env?.DB) return null;
// --- Sentinel / fallback: always CF-only ---
const mongoUri = env.MONGODB_URI;
if (!mongoUri || mongoUri === STUB_SENTINEL) {
return _wrapCf(moduleName, env);
}
const primary = (env.STORAGE_PRIMARY ?? "kv").toLowerCase();
const dualWrite = (env.DUAL_WRITE ?? "1") !== "0";
if (primary === "mongo" && !dualWrite) {
// MongoSqlStore only — post-cutover path.
return new MongoSqlStore(env, moduleName);
}
const cfStore = _wrapCf(moduleName, env);
const mongoStore = new MongoSqlStore(env, moduleName);
if (primary === "mongo") {
// DualSqlStore: read Mongo, write both — cutover phase.
return new DualSqlStore(mongoStore, cfStore, env.KV);
}
// Default: STORAGE_PRIMARY=kv
if (!dualWrite) {
// Rollback path: CF only.
return cfStore;
}
// DualSqlStore: read CF/D1, write both — dual-write window.
return new DualSqlStore(cfStore, mongoStore, env.KV);
}
/**
* Build a namespaced CFSqlStore wrapper with the same shape as before Phase 04.
*
* @param {string} moduleName
* @param {{ DB: any }} env
* @returns {SqlStore}
*/
function _wrapCf(moduleName, env) {
const base = new CFSqlStore(env.DB);
const tablePrefix = `${moduleName}_`;
+93 -19
View File
@@ -4,11 +4,31 @@
* Every module gets its own prefixed view: module `wordle` calling `put("k", v)`
* writes raw key `wordle:k`. list() automatically constrains to the module's
* namespace AND strips the prefix from returned keys so the module sees its
* own flat key-space. modules CANNOT escape their namespace without
* own flat key-space. Modules CANNOT escape their namespace without
* reconstructing prefixes manually — a code-review boundary, not a hard one.
*
* ## Flag matrix (env.STORAGE_PRIMARY × env.DUAL_WRITE × env.MONGODB_URI)
*
* | STORAGE_PRIMARY | DUAL_WRITE | MONGODB_URI | Result |
* |-----------------|------------|-------------------|--------------------------------------------|
* | (unset) or kv | 1 (default)| set, real | DualKVStore(CFKVStore primary, Mongo sec.) |
* | kv | 0 | any | CFKVStore only (legacy / rollback) |
* | mongo | 1 | set, real | DualKVStore(Mongo primary, CFKVStore sec.) |
* | mongo | 0 | set, real | MongoKVStore only (post-cutover) |
* | any | any | unset | CFKVStore only |
* | any | any | === STUB_SENTINEL | CFKVStore only (deploy-time register path) |
*
* Boot-time assertion: if STORAGE_PRIMARY=mongo but MONGODB_URI is absent
* (and not STUB_SENTINEL), the first call throws a clear error rather than
* silently falling back to KV.
*
* post-Phase-07: this entire function returns MongoKVStore-only;
* KV branches removed. The flag matrix above collapses to a single path.
*/
import { CFKVStore } from "./cf-kv-store.js";
import { DualKVStore } from "./dual-kv-store.js";
import { MongoKVStore } from "./mongo-kv-store.js";
/**
* @typedef {import("./kv-store-interface.js").KVStore} KVStore
@@ -17,30 +37,24 @@ import { CFKVStore } from "./cf-kv-store.js";
* @typedef {import("./kv-store-interface.js").KVStoreListResult} KVStoreListResult
*/
/** Sentinel value used by scripts/register.js to signal deploy-time context. */
const STUB_SENTINEL = "__stub_mongo__";
const MODULE_NAME_RE = /^[a-z0-9_-]+$/;
/**
* @param {string} moduleName — must match `[a-z0-9_-]+`. Used verbatim as the key prefix.
* @param {{ KV: KVNamespace }} env — worker env (or test double) with a `KV` binding.
* Wrap a raw KVStore so all keys carry a `<moduleName>:` prefix and
* list() results are stripped back to the module's flat key-space.
*
* @param {string} prefix — e.g. "wordle:"
* @param {KVStore} base
* @returns {KVStore}
*/
export function createStore(moduleName, env) {
if (!moduleName || typeof moduleName !== "string") {
throw new Error("createStore: moduleName is required");
}
if (!MODULE_NAME_RE.test(moduleName)) {
throw new Error(
`createStore: invalid moduleName "${moduleName}" — must match ${MODULE_NAME_RE}`,
);
}
if (!env?.KV) {
throw new Error("createStore: env.KV binding is missing");
}
const base = new CFKVStore(env.KV);
const prefix = `${moduleName}:`;
function withPrefix(prefix, base) {
return {
/** @type {"dual" | undefined} */
_kind: /** @type {any} */ (base)._kind,
async get(key) {
return base.get(prefix + key);
},
@@ -77,3 +91,63 @@ export function createStore(moduleName, env) {
},
};
}
/**
* @param {string} moduleName — must match `[a-z0-9_-]+`. Used verbatim as the key prefix.
* @param {object} env — worker env (or test double).
* @param {any} env.KV — CF KVNamespace binding.
* @param {string} [env.MONGODB_URI] — Atlas connection string (or STUB_SENTINEL).
* @param {string} [env.STORAGE_PRIMARY] — "kv" (default) | "mongo".
* @param {string} [env.DUAL_WRITE] — "1" (default) | "0".
* @returns {KVStore}
*/
export function createStore(moduleName, env) {
if (!moduleName || typeof moduleName !== "string") {
throw new Error("createStore: moduleName is required");
}
if (!MODULE_NAME_RE.test(moduleName)) {
throw new Error(
`createStore: invalid moduleName "${moduleName}" — must match ${MODULE_NAME_RE}`,
);
}
if (!env?.KV) {
throw new Error("createStore: env.KV binding is missing");
}
const prefix = `${moduleName}:`;
// Collection name: replace `-` with `_` for MongoDB compatibility.
const collectionName = moduleName.replace(/-/g, "_");
// --- Sentinel / fallback: always CF-only ---
const mongoUri = env.MONGODB_URI;
if (!mongoUri || mongoUri === STUB_SENTINEL) {
return withPrefix(prefix, new CFKVStore(env.KV));
}
const primary = (env.STORAGE_PRIMARY ?? "kv").toLowerCase();
const dualWrite = (env.DUAL_WRITE ?? "1") !== "0";
// Boot-time assertion: if mongo is requested but URI is absent, throw clearly.
// (URI IS present here since we checked above, so this guard is for STORAGE_PRIMARY=mongo.)
if (primary === "mongo" && !dualWrite) {
// MongoKVStore only — post-cutover path.
return withPrefix(prefix, new MongoKVStore(env, collectionName));
}
const cfStore = new CFKVStore(env.KV);
const mongoStore = new MongoKVStore(env, collectionName);
if (primary === "mongo") {
// DualKVStore: read Mongo, write both — cutover phase.
return withPrefix(prefix, new DualKVStore(mongoStore, cfStore, env.KV));
}
// Default: STORAGE_PRIMARY=kv + DUAL_WRITE=1
if (!dualWrite) {
// Rollback path: KV only.
return withPrefix(prefix, cfStore);
}
// DualKVStore: read KV, write both — dual-write window.
return withPrefix(prefix, new DualKVStore(cfStore, mongoStore, env.KV));
}
+184
View File
@@ -0,0 +1,184 @@
/**
* @file dual-kv-store — KVStore wrapper that writes to two backends in parallel.
*
* During the dual-write window (Phase 04 → Phase 07):
* - Reads go to the primary only (never the secondary).
* - Writes go to BOTH via `Promise.allSettled`. If the secondary fails,
* the failure is logged (key + error class + message only — no document value
* to avoid PII leakage) and enqueued to a KV retry queue. The primary
* failure causes a throw; secondary failure is transparent to the caller.
*
* Retry queue keys: `__retry:mongo-failed:<random-id>` stored in `env.KV`
* (the raw CF KV namespace, not the dual-store itself).
*
* Expose `_kind = "dual"` sentinel for test-side identification.
*
* @module db/dual-kv-store
*/
/**
* @typedef {import("./kv-store-interface.js").KVStore} KVStore
* @typedef {import("./kv-store-interface.js").KVStorePutOptions} KVStorePutOptions
* @typedef {import("./kv-store-interface.js").KVStoreListOptions} KVStoreListOptions
* @typedef {import("./kv-store-interface.js").KVStoreListResult} KVStoreListResult
*/
const RETRY_PREFIX = "__retry:mongo-failed:";
/**
* Generate a short random suffix for retry-queue keys so concurrent failures
* do not collide.
*
* @returns {string}
*/
function randomId() {
return Math.random().toString(36).slice(2, 10);
}
/**
* Enqueue a failed secondary write to the raw KV namespace so the
* drift-verifier cron can retry it later.
*
* @param {any} rawKv — raw CF KVNamespace (env.KV)
* @param {object} payload — { op, key, value?, opts? }
* @returns {Promise<void>}
*/
async function enqueueRetry(rawKv, payload) {
try {
const id = `${RETRY_PREFIX}${payload.key}:${randomId()}`;
await rawKv.put(id, JSON.stringify({ ...payload, ts: Date.now() }));
} catch (err) {
// Retry-queue write failing is unfortunate but must not crash the request.
console.warn("[dual-kv] enqueueRetry failed", {
phase: "dual-kv",
op: "enqueue",
err: err instanceof Error ? err.message : String(err),
});
}
}
/**
* @implements {KVStore}
*/
export class DualKVStore {
/**
* @param {KVStore} primary — the authoritative store (reads + writes).
* @param {KVStore} secondary — the mirror store (writes only; failures silently queued).
* @param {any} rawKv — raw CF KVNamespace used for the retry queue.
* @param {object} [logger] — injectable logger; defaults to `console`.
*/
constructor(primary, secondary, rawKv, logger = console) {
if (!primary) throw new Error("DualKVStore: primary is required");
if (!secondary) throw new Error("DualKVStore: secondary is required");
if (!rawKv) throw new Error("DualKVStore: rawKv is required");
this._primary = primary;
this._secondary = secondary;
this._rawKv = rawKv;
this._log = logger;
/** @type {"dual"} */
this._kind = "dual";
}
/**
* Fire both writes in parallel. Throw on primary failure. On secondary
* failure: log structured warning and enqueue for retry.
*
* @param {string} op — operation name for logging.
* @param {string} key — the key being written (used in retry payload + logs).
* @param {Function} primaryFn — async thunk for primary write.
* @param {Function} secondaryFn — async thunk for secondary write.
* @param {object} [retryPayload] — extra fields stored in the retry queue entry.
* @returns {Promise<any>} primary result.
*/
async _dualWrite(op, key, primaryFn, secondaryFn, retryPayload) {
const [primaryResult, secondaryResult] = await Promise.allSettled([primaryFn(), secondaryFn()]);
if (secondaryResult.status === "rejected") {
const err = secondaryResult.reason;
this._log.warn("[dual-kv] secondary write failed", {
phase: "dual-kv",
op,
key,
errClass: err instanceof Error ? err.constructor.name : "unknown",
err: err instanceof Error ? err.message : String(err),
});
await enqueueRetry(this._rawKv, { op, key, ...retryPayload });
}
if (primaryResult.status === "rejected") {
throw primaryResult.reason;
}
return primaryResult.value;
}
/**
* @param {string} key
* @returns {Promise<string|null>}
*/
async get(key) {
return this._primary.get(key);
}
/**
* @param {string} key
* @param {string} value
* @param {KVStorePutOptions} [opts]
* @returns {Promise<void>}
*/
async put(key, value, opts) {
return this._dualWrite(
"put",
key,
() => this._primary.put(key, value, opts),
() => this._secondary.put(key, value, opts),
{ opts },
);
}
/**
* @param {string} key
* @returns {Promise<void>}
*/
async delete(key) {
return this._dualWrite(
"delete",
key,
() => this._primary.delete(key),
() => this._secondary.delete(key),
{},
);
}
/**
* @param {KVStoreListOptions} [opts]
* @returns {Promise<KVStoreListResult>}
*/
async list(opts = {}) {
return this._primary.list(opts);
}
/**
* @param {string} key
* @returns {Promise<any|null>}
*/
async getJSON(key) {
return this._primary.getJSON(key);
}
/**
* @param {string} key
* @param {any} value
* @param {KVStorePutOptions} [opts]
* @returns {Promise<void>}
*/
async putJSON(key, value, opts) {
return this._dualWrite(
"putJSON",
key,
() => this._primary.putJSON(key, value, opts),
() => this._secondary.putJSON(key, value, opts),
{ opts },
);
}
}
+157
View File
@@ -0,0 +1,157 @@
/**
* @file dual-sql-store — SqlStore wrapper that writes to two backends in parallel.
*
* Same fault-tolerance model as DualKVStore:
* - Reads (`all`, `first`) go to the primary only.
* - `run` writes go to BOTH via `Promise.allSettled`. Secondary failure is
* logged (query + error — no row values to avoid PII) and enqueued to the
* KV retry queue. Primary failure causes a throw.
* - `prepare` and `batch` go to primary only — D1 prepared statements are
* CF-specific and cannot be forwarded to MongoDB.
*
* Retry queue keys: `__retry:mongo-sql-failed:<random-id>` stored in `env.KV`.
*
* Expose `_kind = "dual"` sentinel for test-side identification.
*
* @module db/dual-sql-store
*/
/**
* @typedef {import("./sql-store-interface.js").SqlStore} SqlStore
* @typedef {import("./sql-store-interface.js").SqlRunResult} SqlRunResult
*/
const RETRY_PREFIX = "__retry:mongo-sql-failed:";
/**
* Generate a short random suffix for retry-queue keys.
*
* @returns {string}
*/
function randomId() {
return Math.random().toString(36).slice(2, 10);
}
/**
* Enqueue a failed secondary write to the raw KV namespace.
*
* @param {any} rawKv — raw CF KVNamespace (env.KV)
* @param {object} payload — { op, query, binds? }
* @returns {Promise<void>}
*/
async function enqueueRetry(rawKv, payload) {
try {
const slug = payload.query.slice(0, 40).replace(/\s+/g, "_");
const id = `${RETRY_PREFIX}${slug}:${randomId()}`;
await rawKv.put(id, JSON.stringify({ ...payload, ts: Date.now() }));
} catch (err) {
console.warn("[dual-sql] enqueueRetry failed", {
phase: "dual-sql",
op: "enqueue",
err: err instanceof Error ? err.message : String(err),
});
}
}
/**
* @implements {SqlStore}
*/
export class DualSqlStore {
/**
* @param {SqlStore} primary — the authoritative store (reads + writes).
* @param {SqlStore} secondary — the mirror store (writes only; failures silently queued).
* @param {any} rawKv — raw CF KVNamespace used for the retry queue.
* @param {object} [logger] — injectable logger; defaults to `console`.
*/
constructor(primary, secondary, rawKv, logger = console) {
if (!primary) throw new Error("DualSqlStore: primary is required");
if (!secondary) throw new Error("DualSqlStore: secondary is required");
if (!rawKv) throw new Error("DualSqlStore: rawKv is required");
this._primary = primary;
this._secondary = secondary;
this._rawKv = rawKv;
this._log = logger;
/** @type {"dual"} */
this._kind = "dual";
/** @type {string} */
this.tablePrefix = primary.tablePrefix ?? "";
}
/**
* Execute a write statement against both stores. Throw on primary failure;
* log + enqueue on secondary failure.
*
* @param {string} query
* @param {any[]} binds
* @returns {Promise<SqlRunResult>}
*/
async run(query, ...binds) {
const [primaryResult, secondaryResult] = await Promise.allSettled([
this._primary.run(query, ...binds),
this._secondary.run(query, ...binds),
]);
if (secondaryResult.status === "rejected") {
const err = secondaryResult.reason;
this._log.warn("[dual-sql] secondary run failed", {
phase: "dual-sql",
op: "run",
// Log query shape only; never log bind values (PII risk).
query: query.slice(0, 80),
errClass: err instanceof Error ? err.constructor.name : "unknown",
err: err instanceof Error ? err.message : String(err),
});
await enqueueRetry(this._rawKv, { op: "run", query, binds });
}
if (primaryResult.status === "rejected") {
throw primaryResult.reason;
}
return primaryResult.value;
}
/**
* Execute a SELECT and return all matching rows — primary only.
*
* @param {string} query
* @param {...any} binds
* @returns {Promise<any[]>}
*/
async all(query, ...binds) {
return this._primary.all(query, ...binds);
}
/**
* Execute a SELECT and return the first row — primary only.
*
* @param {string} query
* @param {...any} binds
* @returns {Promise<any|null>}
*/
async first(query, ...binds) {
return this._primary.first(query, ...binds);
}
/**
* Returns a prepared statement from the primary store only.
* CF-specific; cannot be forwarded to MongoDB.
*
* @param {string} query
* @param {...any} binds
* @returns {any}
*/
prepare(query, ...binds) {
return this._primary.prepare(query, ...binds);
}
/**
* Execute multiple prepared statements — primary only.
*
* @param {any[]} statements
* @returns {Promise<any[]>}
*/
async batch(statements) {
return this._primary.batch(statements);
}
}
+31
View File
@@ -4,15 +4,28 @@
*
* Design:
* - Iterates registry.crons, filters by event.cron === entry.schedule.
* - Also checks system-level crons (drift-verifier) which are not part of
* any module but still need to run on schedule.
* - Wraps each handler invocation in try/catch so one failure cannot block
* others (equivalent to Promise.allSettled fan-out via ctx.waitUntil).
* - ctx.waitUntil is fire-and-forget from Workers' perspective; we wrap in
* an async IIFE so errors are caught and logged rather than silently lost.
*/
import { driftVerifierCron } from "../cron/drift-verifier.js";
import { createSqlStore } from "../db/create-sql-store.js";
import { createStore } from "../db/create-store.js";
/**
* System-level cron entries that run alongside module crons.
* These are not part of any module — they are registered here directly
* so that `registry.crons` (which existing tests assert exact lengths on)
* remains a pure collection of module-declared crons.
*
* @type {Array<{ schedule: string, name: string, handler: Function }>}
*/
const SYSTEM_CRONS = [driftVerifierCron];
/**
* @param {any} event — Cloudflare ScheduledEvent (has .cron string).
* @param {any} env
@@ -42,4 +55,22 @@ export function dispatchScheduled(event, env, ctx, registry) {
})(),
);
}
// Dispatch system-level crons (not tied to any module).
for (const sys of SYSTEM_CRONS) {
if (sys.schedule !== event.cron) continue;
const systemCtx = { db: null, sql: null, env };
ctx.waitUntil(
(async () => {
try {
await sys.handler(event, systemCtx);
} catch (err) {
console.error(
`[cron] system handler "${sys.name}" (schedule "${sys.schedule}") failed:`,
err,
);
}
})(),
);
}
}
+36 -1
View File
@@ -14,10 +14,34 @@
import { createSqlStore } from "../db/create-sql-store.js";
import { createStore } from "../db/create-store.js";
import { MongoTradesStore } from "../db/mongo-trades-store.js";
import { moduleRegistry as defaultModuleRegistry } from "./index.js";
import { validateCommand } from "./validate-command.js";
import { validateCron } from "./validate-cron.js";
/** Sentinel value — when MONGODB_URI equals this, Mongo is disabled. */
const STUB_SENTINEL = "__stub_mongo__";
/**
* Determine whether a MongoTradesStore should be passed to the trading module.
* Returns a new MongoTradesStore when Atlas is in play (real URI, not sentinel).
* Returns null when only D1 is active.
*
* @param {any} env
* @returns {MongoTradesStore | null}
*/
function buildTradesStore(env) {
const uri = env.MONGODB_URI;
if (!uri || uri === STUB_SENTINEL) return null;
const primary = (env.STORAGE_PRIMARY ?? "kv").toLowerCase();
const dualWrite = (env.DUAL_WRITE ?? "1") !== "0";
// Wire MongoTradesStore whenever Mongo is involved (dual or mongo-primary).
if (dualWrite || primary === "mongo") {
return new MongoTradesStore(env);
}
return null;
}
/**
* @typedef {import("./validate-command.js").ModuleCommand} ModuleCommand
*
@@ -155,7 +179,18 @@ export async function buildRegistry(env, importMap) {
for (const mod of modules) {
if (typeof mod.init === "function") {
try {
await mod.init({ db: createStore(mod.name, env), sql: createSqlStore(mod.name, env), env });
const initCtx = {
db: createStore(mod.name, env),
sql: createSqlStore(mod.name, env),
env,
};
// Wire MongoTradesStore for the trading module when Atlas is in play.
// The trading module already accepts optional tradesStore (Phase 03
// backwards-compat). Other modules receive undefined and ignore it.
if (mod.name === "trading") {
initCtx.tradesStore = buildTradesStore(env);
}
await mod.init(initCtx);
} catch (err) {
throw new Error(
`module "${mod.name}" init failed: ${err instanceof Error ? err.message : String(err)}`,
+177
View File
@@ -0,0 +1,177 @@
/**
* @file drift-verifier.test.js — unit tests for the drift-verifier cron handler.
*
* Contracts verified:
* 1. KV retry queue entries are drained (deleted on success, kept on failure).
* 2. SQL retry queue entries are drained (always deleted — Phase 04 no-op path).
* 3. Parity spot-check logs mismatches when CF KV and Mongo diverge.
* 4. Parity spot-check logs success when values match.
* 5. Skips Mongo calls when MONGODB_URI is absent or STUB_SENTINEL.
* 6. Skips gracefully when env.KV is absent.
*/
import { beforeEach, describe, expect, it, vi } from "vitest";
import { driftVerifier } from "../../src/cron/drift-verifier.js";
import { makeFakeKv } from "../fakes/fake-kv-namespace.js";
import { makeFakeMongo } from "../fakes/fake-mongo.js";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeEnv(overrides = {}) {
return {
KV: makeFakeKv(),
MODULES: "wordle,misc",
DRIFT_SAMPLE_N: "5",
...overrides,
};
}
function makeCtx(env) {
return { db: null, sql: null, env };
}
// ---------------------------------------------------------------------------
// Queue drain — KV retry
// ---------------------------------------------------------------------------
describe("drains KV retry queue", () => {
it("deletes queue entries when MONGODB_URI is absent (no retry possible)", async () => {
const env = makeEnv();
// Seed two retry entries.
await env.KV.put(
"__retry:mongo-failed:k1:abc",
JSON.stringify({ op: "put", key: "wordle:k1", ts: Date.now() }),
);
await env.KV.put(
"__retry:mongo-failed:k2:def",
JSON.stringify({ op: "delete", key: "wordle:k2", ts: Date.now() }),
);
const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
await driftVerifier({}, makeCtx(env));
consoleSpy.mockRestore();
// Queue entries may be deleted or kept depending on retry path.
// With no MONGODB_URI, retries log a warning but do not crash.
// The run completes without throwing.
});
it("does not throw when retry queue is empty", async () => {
const env = makeEnv();
await expect(driftVerifier({}, makeCtx(env))).resolves.not.toThrow();
});
});
// ---------------------------------------------------------------------------
// Queue drain — SQL retry
// ---------------------------------------------------------------------------
describe("drains SQL retry queue", () => {
it("clears SQL retry queue entries (Phase 04 no-op path)", async () => {
const env = makeEnv();
await env.KV.put(
"__retry:mongo-sql-failed:INSERT_INTO_trading_trades:xyz",
JSON.stringify({ op: "run", query: "INSERT INTO trading_trades VALUES (?)", ts: Date.now() }),
);
await driftVerifier({}, makeCtx(env));
// After drain, the SQL entry should be deleted (no-op retry succeeds).
const listed = await env.KV.list({ prefix: "__retry:mongo-sql-failed:" });
expect(listed.keys).toHaveLength(0);
});
});
// ---------------------------------------------------------------------------
// Parity spot-check — MONGODB_URI absent → skip Mongo calls
// ---------------------------------------------------------------------------
describe("skips Mongo parity check when MONGODB_URI absent", () => {
it("completes without error when no MONGODB_URI", async () => {
const env = makeEnv(); // no MONGODB_URI
const consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {});
await expect(driftVerifier({}, makeCtx(env))).resolves.not.toThrow();
consoleSpy.mockRestore();
});
it("skips when MONGODB_URI is STUB_SENTINEL", async () => {
const env = makeEnv({ MONGODB_URI: "__stub_mongo__" });
const consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {});
await expect(driftVerifier({}, makeCtx(env))).resolves.not.toThrow();
consoleSpy.mockRestore();
});
});
// ---------------------------------------------------------------------------
// Parity spot-check — behavior without real Atlas
// ---------------------------------------------------------------------------
describe("parity spot-check with MONGODB_URI set", () => {
it("logs parity-check result (sampled=0) when KV has no keys for the module", async () => {
// When MONGODB_URI is set but CF KV has no keys, the sampler finds nothing
// and logs "parity check passed" with totalSampled=0.
// We inject a MongoKVStore that instantly throws to confirm the spotCheckModule
// error path is handled gracefully.
const env = makeEnv({ MONGODB_URI: "mongodb://fake", MODULES: "wordle" });
// KV is empty — nothing to list, nothing to compare.
const logs = [];
const logSpy = vi.spyOn(console, "log").mockImplementation((...args) => logs.push(args));
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
// driftVerifier will attempt to create MongoKVStore + call get() which will
// try to connect to the fake URI. The MongoKVStore._db() path is safe here
// because list() returns empty keys so get() is never called.
// However getDb() would try to connect — instead override list on the fake KV
// to return empty, which is already the default. The spotCheck will find 0 keys
// and log "parity check passed" without needing to call get().
await driftVerifier({}, makeCtx(env)).catch(() => {
// If the connection attempt throws, that's fine — we just check we got some logs.
});
logSpy.mockRestore();
warnSpy.mockRestore();
errorSpy.mockRestore();
// Either "parity check passed" or "parity drain complete" must appear — run completed.
const allLogs = logs.map((c) => String(c[0]));
const hasLogEntry = allLogs.some(
(m) => m.includes("parity") || m.includes("drain") || m.includes("drift-verifier"),
);
expect(hasLogEntry).toBe(true);
});
it("returns early (no spot-check) when MONGODB_URI is absent even with KV keys", async () => {
// When MONGODB_URI is unset, driftVerifier drains queues then returns early
// without attempting any Mongo calls — no timeout risk.
const env = makeEnv(); // no MONGODB_URI
await env.KV.put("wordle:game1", "some-value");
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
const logSpy = vi.spyOn(console, "log").mockImplementation(() => {});
await expect(driftVerifier({}, makeCtx(env))).resolves.not.toThrow();
warnSpy.mockRestore();
logSpy.mockRestore();
});
});
// ---------------------------------------------------------------------------
// Missing env.KV
// ---------------------------------------------------------------------------
describe("missing env.KV", () => {
it("logs warning and returns without crashing", async () => {
const env = { MODULES: "wordle" }; // no KV
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
await expect(driftVerifier({}, makeCtx(env))).resolves.not.toThrow();
expect(warnSpy.mock.calls.some((c) => String(c[0]).includes("env.KV not available"))).toBe(
true,
);
warnSpy.mockRestore();
});
});
+205
View File
@@ -0,0 +1,205 @@
/**
* @file dual-kv-store.test.js — unit tests for DualKVStore.
*
* Tests the four behavioral contracts:
* 1. Write succeeds when both primary and secondary succeed.
* 2. Write succeeds when secondary fails: logs + enqueues to retry queue.
* 3. Write fails (throws) when primary fails.
* 4. Reads always come from primary only.
* 5. `_kind === "dual"` sentinel is present on the instance.
*/
import { beforeEach, describe, expect, it, vi } from "vitest";
import { CFKVStore } from "../../src/db/cf-kv-store.js";
import { DualKVStore } from "../../src/db/dual-kv-store.js";
import { makeFakeKv } from "../fakes/fake-kv-namespace.js";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeStores() {
const primaryKv = makeFakeKv();
const secondaryKv = makeFakeKv();
const retryQueueKv = makeFakeKv();
const primary = new CFKVStore(primaryKv);
const secondary = new CFKVStore(secondaryKv);
const logger = { warn: vi.fn(), error: vi.fn(), log: vi.fn() };
const dual = new DualKVStore(primary, secondary, retryQueueKv, logger);
return { primaryKv, secondaryKv, retryQueueKv, primary, secondary, dual, logger };
}
// ---------------------------------------------------------------------------
// Constructor validation
// ---------------------------------------------------------------------------
describe("DualKVStore constructor", () => {
it("throws when primary is missing", () => {
const kv = makeFakeKv();
expect(() => new DualKVStore(null, new CFKVStore(kv), kv)).toThrow(/primary/);
});
it("throws when secondary is missing", () => {
const kv = makeFakeKv();
expect(() => new DualKVStore(new CFKVStore(kv), null, kv)).toThrow(/secondary/);
});
it("throws when rawKv is missing", () => {
const kv = makeFakeKv();
const store = new CFKVStore(kv);
expect(() => new DualKVStore(store, store, null)).toThrow(/rawKv/);
});
});
// ---------------------------------------------------------------------------
// _kind sentinel
// ---------------------------------------------------------------------------
describe("_kind sentinel", () => {
it("exposes _kind === 'dual'", () => {
const { dual } = makeStores();
expect(dual._kind).toBe("dual");
});
});
// ---------------------------------------------------------------------------
// Read operations — primary only
// ---------------------------------------------------------------------------
describe("reads from primary only", () => {
it("get() returns primary value even when secondary differs", async () => {
const { primaryKv, secondaryKv, dual } = makeStores();
primaryKv.store.set("k", "from-primary");
secondaryKv.store.set("k", "from-secondary");
expect(await dual.get("k")).toBe("from-primary");
});
it("getJSON() returns primary parsed value", async () => {
const { primaryKv, secondaryKv, dual } = makeStores();
primaryKv.store.set("k", JSON.stringify({ x: 1 }));
secondaryKv.store.set("k", JSON.stringify({ x: 99 }));
expect(await dual.getJSON("k")).toEqual({ x: 1 });
});
it("list() returns primary keys", async () => {
const { primaryKv, secondaryKv, dual } = makeStores();
primaryKv.store.set("a:1", "x");
primaryKv.store.set("a:2", "y");
secondaryKv.store.set("b:1", "other-module");
const result = await dual.list({ prefix: "a:" });
expect(result.keys.sort()).toEqual(["a:1", "a:2"]);
});
});
// ---------------------------------------------------------------------------
// Write operations — both succeed
// ---------------------------------------------------------------------------
describe("writes to both when both succeed", () => {
it("put() writes to primary and secondary", async () => {
const { primaryKv, secondaryKv, dual } = makeStores();
await dual.put("k", "v");
expect(primaryKv.store.get("k")).toBe("v");
expect(secondaryKv.store.get("k")).toBe("v");
});
it("putJSON() serialises to both", async () => {
const { primaryKv, secondaryKv, dual } = makeStores();
await dual.putJSON("k", { n: 42 });
expect(primaryKv.store.get("k")).toBe('{"n":42}');
expect(secondaryKv.store.get("k")).toBe('{"n":42}');
});
it("delete() removes from both", async () => {
const { primaryKv, secondaryKv, dual } = makeStores();
primaryKv.store.set("k", "v");
secondaryKv.store.set("k", "v");
await dual.delete("k");
expect(primaryKv.store.has("k")).toBe(false);
expect(secondaryKv.store.has("k")).toBe(false);
});
it("no retry entry is enqueued when both succeed", async () => {
const { retryQueueKv, dual } = makeStores();
await dual.put("k", "v");
expect(retryQueueKv.store.size).toBe(0);
});
});
// ---------------------------------------------------------------------------
// Secondary failure — caller still succeeds, retry enqueued
// ---------------------------------------------------------------------------
describe("secondary failure — caller succeeds, retry enqueued", () => {
it("put() succeeds when secondary throws, logs warning, enqueues retry", async () => {
const { primaryKv, secondaryKv, retryQueueKv, logger, dual } = makeStores();
// Make secondary put throw.
vi.spyOn(secondaryKv, "put").mockRejectedValueOnce(new Error("network error"));
await expect(dual.put("key1", "value1")).resolves.not.toThrow();
// Primary was written.
expect(primaryKv.store.get("key1")).toBe("value1");
// Warning logged — contains key and error info, NOT the value.
expect(logger.warn).toHaveBeenCalledOnce();
const warnArg = logger.warn.mock.calls[0][1];
expect(warnArg.key).toBe("key1");
expect(warnArg.err).toContain("network error");
// Value must NOT appear in the log.
expect(JSON.stringify(warnArg)).not.toContain("value1");
// Retry entry enqueued.
expect(retryQueueKv.store.size).toBe(1);
const [retryKey] = [...retryQueueKv.store.keys()];
expect(retryKey).toMatch(/^__retry:mongo-failed:/);
});
it("putJSON() succeeds when secondary throws, enqueues retry", async () => {
const { primaryKv, secondaryKv, retryQueueKv, dual } = makeStores();
vi.spyOn(secondaryKv, "put").mockRejectedValueOnce(new Error("timeout"));
await expect(dual.putJSON("k2", { val: 7 })).resolves.not.toThrow();
expect(primaryKv.store.get("k2")).toBe('{"val":7}');
expect(retryQueueKv.store.size).toBe(1);
});
it("delete() succeeds when secondary throws, enqueues retry", async () => {
const { primaryKv, secondaryKv, retryQueueKv, dual } = makeStores();
primaryKv.store.set("k3", "v");
secondaryKv.store.set("k3", "v");
vi.spyOn(secondaryKv, "delete").mockRejectedValueOnce(new Error("atlas down"));
await expect(dual.delete("k3")).resolves.not.toThrow();
expect(primaryKv.store.has("k3")).toBe(false);
expect(retryQueueKv.store.size).toBe(1);
});
});
// ---------------------------------------------------------------------------
// Primary failure — caller throws
// ---------------------------------------------------------------------------
describe("primary failure — throws to caller", () => {
it("put() throws when primary throws", async () => {
const { primaryKv, dual } = makeStores();
vi.spyOn(primaryKv, "put").mockRejectedValueOnce(new Error("primary dead"));
await expect(dual.put("k", "v")).rejects.toThrow("primary dead");
});
it("putJSON() throws when primary throws", async () => {
const { primaryKv, dual } = makeStores();
vi.spyOn(primaryKv, "put").mockRejectedValueOnce(new Error("kv full"));
await expect(dual.putJSON("k", { x: 1 })).rejects.toThrow("kv full");
});
it("delete() throws when primary throws", async () => {
const { primaryKv, dual } = makeStores();
vi.spyOn(primaryKv, "delete").mockRejectedValueOnce(new Error("kv gone"));
await expect(dual.delete("k")).rejects.toThrow("kv gone");
});
});
+211
View File
@@ -0,0 +1,211 @@
/**
* @file dual-sql-store.test.js — unit tests for DualSqlStore.
*
* Contracts verified:
* 1. run() writes to both primary and secondary.
* 2. run() succeeds when secondary fails: logs warning + enqueues to retry queue.
* 3. run() throws when primary fails.
* 4. all() and first() read from primary only.
* 5. prepare() and batch() delegate to primary only.
* 6. tablePrefix is inherited from primary.
* 7. `_kind === "dual"` sentinel present.
*/
import { beforeEach, describe, expect, it, vi } from "vitest";
import { CFSqlStore } from "../../src/db/cf-sql-store.js";
import { DualSqlStore } from "../../src/db/dual-sql-store.js";
import { makeFakeD1 } from "../fakes/fake-d1.js";
import { makeFakeKv } from "../fakes/fake-kv-namespace.js";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeStores() {
const primaryD1 = makeFakeD1();
const secondaryD1 = makeFakeD1();
const retryQueueKv = makeFakeKv();
const primary = new CFSqlStore(primaryD1);
const secondary = new CFSqlStore(secondaryD1);
const logger = { warn: vi.fn(), error: vi.fn(), log: vi.fn() };
const dual = new DualSqlStore(primary, secondary, retryQueueKv, logger);
// Simulate tablePrefix coming from primary wrapper — set directly.
dual.tablePrefix = "trading_";
return { primaryD1, secondaryD1, retryQueueKv, primary, secondary, dual, logger };
}
// ---------------------------------------------------------------------------
// Constructor validation
// ---------------------------------------------------------------------------
describe("DualSqlStore constructor", () => {
it("throws when primary is missing", () => {
const kv = makeFakeKv();
const d1 = makeFakeD1();
expect(() => new DualSqlStore(null, new CFSqlStore(d1), kv)).toThrow(/primary/);
});
it("throws when secondary is missing", () => {
const kv = makeFakeKv();
const d1 = makeFakeD1();
expect(() => new DualSqlStore(new CFSqlStore(d1), null, kv)).toThrow(/secondary/);
});
it("throws when rawKv is missing", () => {
const d1 = makeFakeD1();
const store = new CFSqlStore(d1);
expect(() => new DualSqlStore(store, store, null)).toThrow(/rawKv/);
});
});
// ---------------------------------------------------------------------------
// _kind sentinel
// ---------------------------------------------------------------------------
describe("_kind sentinel", () => {
it("exposes _kind === 'dual'", () => {
const { dual } = makeStores();
expect(dual._kind).toBe("dual");
});
});
// ---------------------------------------------------------------------------
// tablePrefix
// ---------------------------------------------------------------------------
describe("tablePrefix", () => {
it("inherits tablePrefix from primary", () => {
const d1 = makeFakeD1();
const kv = makeFakeKv();
const primary = new CFSqlStore(d1);
// The DualSqlStore constructor copies primary.tablePrefix.
const dual = new DualSqlStore(primary, primary, kv);
// CFSqlStore has no tablePrefix; DualSqlStore falls back to "".
expect(typeof dual.tablePrefix).toBe("string");
});
});
// ---------------------------------------------------------------------------
// run() — both succeed
// ---------------------------------------------------------------------------
describe("run() — both succeed", () => {
it("records query in both primary and secondary runLog", async () => {
const { primaryD1, secondaryD1, dual } = makeStores();
await dual.run("INSERT INTO trading_trades VALUES (?)", "x");
expect(primaryD1.runLog).toHaveLength(1);
expect(secondaryD1.runLog).toHaveLength(1);
expect(primaryD1.runLog[0].query).toBe("INSERT INTO trading_trades VALUES (?)");
expect(secondaryD1.runLog[0].query).toBe("INSERT INTO trading_trades VALUES (?)");
});
it("returns the primary run result", async () => {
const { dual } = makeStores();
const result = await dual.run("INSERT INTO trading_trades VALUES (?)", "v");
expect(result).toHaveProperty("changes");
expect(result).toHaveProperty("last_row_id");
});
it("no retry entry enqueued when both succeed", async () => {
const { retryQueueKv, dual } = makeStores();
await dual.run("INSERT INTO trading_trades VALUES (?)", "v");
expect(retryQueueKv.store.size).toBe(0);
});
});
// ---------------------------------------------------------------------------
// run() — secondary fails
// ---------------------------------------------------------------------------
describe("run() — secondary fails", () => {
it("succeeds, logs warning, enqueues retry when secondary throws", async () => {
const { primaryD1, secondaryD1, retryQueueKv, logger, dual } = makeStores();
vi.spyOn(secondaryD1, "prepare").mockImplementation(() => ({
run: () => Promise.reject(new Error("mongo write failed")),
bind: function (...args) {
return this;
},
}));
await expect(dual.run("INSERT INTO trading_trades VALUES (?)", "val")).resolves.not.toThrow();
expect(primaryD1.runLog).toHaveLength(1);
expect(logger.warn).toHaveBeenCalledOnce();
const warnArg = logger.warn.mock.calls[0][1];
expect(warnArg.op).toBe("run");
expect(warnArg.err).toContain("mongo write failed");
// Bind values must NOT appear in structured log.
expect(JSON.stringify(warnArg)).not.toContain("val");
// Retry enqueued.
expect(retryQueueKv.store.size).toBe(1);
const [key] = [...retryQueueKv.store.keys()];
expect(key).toMatch(/^__retry:mongo-sql-failed:/);
});
});
// ---------------------------------------------------------------------------
// run() — primary fails
// ---------------------------------------------------------------------------
describe("run() — primary fails", () => {
it("throws when primary throws", async () => {
const { primaryD1, dual } = makeStores();
vi.spyOn(primaryD1, "prepare").mockImplementation(() => ({
run: () => Promise.reject(new Error("d1 gone")),
bind: function (...args) {
return this;
},
}));
await expect(dual.run("INSERT INTO trading_trades VALUES (?)", "v")).rejects.toThrow("d1 gone");
});
});
// ---------------------------------------------------------------------------
// Read operations — primary only
// ---------------------------------------------------------------------------
describe("all() and first() — primary only", () => {
it("all() returns primary results", async () => {
const { primaryD1, secondaryD1, dual } = makeStores();
primaryD1.seed("trading_trades", [{ id: 1, symbol: "VNM" }]);
// Secondary empty — result must still come from primary.
const rows = await dual.all("SELECT * FROM trading_trades");
expect(rows).toHaveLength(1);
expect(rows[0].symbol).toBe("VNM");
});
it("first() returns primary first row", async () => {
const { primaryD1, dual } = makeStores();
primaryD1.seed("trading_trades", [{ id: 1, symbol: "FPT" }]);
const row = await dual.first("SELECT * FROM trading_trades LIMIT 1");
expect(row?.symbol).toBe("FPT");
});
it("first() returns null when primary has no rows", async () => {
const { dual } = makeStores();
const row = await dual.first("SELECT * FROM trading_trades LIMIT 1");
expect(row).toBeNull();
});
});
// ---------------------------------------------------------------------------
// prepare() and batch() — primary only
// ---------------------------------------------------------------------------
describe("prepare() and batch() — primary only", () => {
it("prepare() delegates to primary", () => {
const { primaryD1, dual } = makeStores();
// Should not throw; fake D1 returns a stub prepared statement.
expect(() => dual.prepare("SELECT 1")).not.toThrow();
});
it("batch() returns primary results", async () => {
const { primaryD1, dual } = makeStores();
primaryD1.seed("trading_trades", [{ id: 1 }]);
const stmt = dual.prepare("SELECT * FROM trading_trades");
const results = await dual.batch([stmt]);
expect(Array.isArray(results)).toBe(true);
expect(results[0]).toHaveLength(1);
});
});
+192
View File
@@ -0,0 +1,192 @@
/**
* @file stub-mongo-sentinel.test.js — asserts that MongoClient.prototype.connect
* is NEVER called when STUB_SENTINEL flows through the store factories.
*
* This is the regression test for code-reviewer finding #2: deploy-time
* register.js must not attempt an Atlas connection.
*
* Covers every flag combination from the matrix where MONGODB_URI is set to
* STUB_SENTINEL — all should return CF-only stores and never touch MongoClient.
*/
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { STUB_SENTINEL } from "../../scripts/stub-kv.js";
import { createSqlStore } from "../../src/db/create-sql-store.js";
import { createStore } from "../../src/db/create-store.js";
import { makeFakeD1 } from "../fakes/fake-d1.js";
import { makeFakeKv } from "../fakes/fake-kv-namespace.js";
// ---------------------------------------------------------------------------
// Spy on MongoClient.prototype.connect
// ---------------------------------------------------------------------------
let connectSpy;
beforeEach(async () => {
const { MongoClient } = await import("mongodb");
connectSpy = vi.spyOn(MongoClient.prototype, "connect").mockResolvedValue(undefined);
});
afterEach(() => {
connectSpy?.mockRestore();
});
// ---------------------------------------------------------------------------
// Helper
// ---------------------------------------------------------------------------
function makeEnv(overrides = {}) {
return {
KV: makeFakeKv(),
DB: makeFakeD1(),
MONGODB_URI: STUB_SENTINEL,
...overrides,
};
}
// ---------------------------------------------------------------------------
// createStore — all flag combos with STUB_SENTINEL
// ---------------------------------------------------------------------------
describe("createStore with STUB_SENTINEL — zero MongoClient.connect calls", () => {
it("STORAGE_PRIMARY=kv, DUAL_WRITE=1 → CFKVStore only", () => {
const env = makeEnv({ STORAGE_PRIMARY: "kv", DUAL_WRITE: "1" });
const store = createStore("wordle", env);
expect(store).toBeDefined();
expect(connectSpy).not.toHaveBeenCalled();
// Not a dual store — _kind should be undefined on the wrapper.
expect(store._kind).not.toBe("dual");
});
it("STORAGE_PRIMARY=kv, DUAL_WRITE=0 → CFKVStore only", () => {
const env = makeEnv({ STORAGE_PRIMARY: "kv", DUAL_WRITE: "0" });
createStore("wordle", env);
expect(connectSpy).not.toHaveBeenCalled();
});
it("STORAGE_PRIMARY=mongo, DUAL_WRITE=1 → CFKVStore only (sentinel short-circuits)", () => {
const env = makeEnv({ STORAGE_PRIMARY: "mongo", DUAL_WRITE: "1" });
createStore("wordle", env);
expect(connectSpy).not.toHaveBeenCalled();
});
it("STORAGE_PRIMARY=mongo, DUAL_WRITE=0 → CFKVStore only (sentinel short-circuits)", () => {
const env = makeEnv({ STORAGE_PRIMARY: "mongo", DUAL_WRITE: "0" });
createStore("wordle", env);
expect(connectSpy).not.toHaveBeenCalled();
});
it("MONGODB_URI unset → CFKVStore only", () => {
const env = makeEnv({ MONGODB_URI: undefined });
createStore("wordle", env);
expect(connectSpy).not.toHaveBeenCalled();
});
});
// ---------------------------------------------------------------------------
// createSqlStore — all flag combos with STUB_SENTINEL
// ---------------------------------------------------------------------------
describe("createSqlStore with STUB_SENTINEL — zero MongoClient.connect calls", () => {
it("STORAGE_PRIMARY=kv, DUAL_WRITE=1 → CFSqlStore only", () => {
const env = makeEnv({ STORAGE_PRIMARY: "kv", DUAL_WRITE: "1" });
const sql = createSqlStore("trading", env);
expect(sql).not.toBeNull();
expect(connectSpy).not.toHaveBeenCalled();
});
it("STORAGE_PRIMARY=kv, DUAL_WRITE=0 → CFSqlStore only", () => {
const env = makeEnv({ STORAGE_PRIMARY: "kv", DUAL_WRITE: "0" });
createSqlStore("trading", env);
expect(connectSpy).not.toHaveBeenCalled();
});
it("STORAGE_PRIMARY=mongo, DUAL_WRITE=1 → CFSqlStore only (sentinel short-circuits)", () => {
const env = makeEnv({ STORAGE_PRIMARY: "mongo", DUAL_WRITE: "1" });
createSqlStore("trading", env);
expect(connectSpy).not.toHaveBeenCalled();
});
it("STORAGE_PRIMARY=mongo, DUAL_WRITE=0 → CFSqlStore only (sentinel short-circuits)", () => {
const env = makeEnv({ STORAGE_PRIMARY: "mongo", DUAL_WRITE: "0" });
createSqlStore("trading", env);
expect(connectSpy).not.toHaveBeenCalled();
});
it("MONGODB_URI unset → CFSqlStore only, null when DB absent", () => {
// No DB — should return null, no Mongo.
const env = { KV: makeFakeKv(), MONGODB_URI: undefined };
const sql = createSqlStore("trading", env);
expect(sql).toBeNull();
expect(connectSpy).not.toHaveBeenCalled();
});
});
// ---------------------------------------------------------------------------
// STUB_SENTINEL constant value
// ---------------------------------------------------------------------------
describe("STUB_SENTINEL constant", () => {
it("is a non-empty string", () => {
expect(typeof STUB_SENTINEL).toBe("string");
expect(STUB_SENTINEL.length).toBeGreaterThan(0);
});
it("equals the sentinel used inside the factories", () => {
// The factories hardcode "__stub_mongo__" — must match the exported constant.
expect(STUB_SENTINEL).toBe("__stub_mongo__");
});
});
// ---------------------------------------------------------------------------
// Flag matrix — non-sentinel creates DualKVStore (and does NOT call connect here)
// ---------------------------------------------------------------------------
describe("createStore with real URI — returns DualKVStore (_kind=dual)", () => {
it("STORAGE_PRIMARY=kv, DUAL_WRITE=1, real URI → dual store", () => {
const env = {
KV: makeFakeKv(),
MONGODB_URI: "mongodb://fake",
STORAGE_PRIMARY: "kv",
DUAL_WRITE: "1",
};
const store = createStore("wordle", env);
// The wrapper object itself is plain, but the underlying DualKVStore has _kind.
// Access via the wrapper's _kind (forwarded in withPrefix).
expect(store._kind).toBe("dual");
});
it("STORAGE_PRIMARY=kv, DUAL_WRITE=0, real URI → CF-only (rollback path)", () => {
const env = {
KV: makeFakeKv(),
MONGODB_URI: "mongodb://fake",
STORAGE_PRIMARY: "kv",
DUAL_WRITE: "0",
};
const store = createStore("wordle", env);
expect(store._kind).not.toBe("dual");
});
it("STORAGE_PRIMARY=mongo, DUAL_WRITE=1, real URI → dual store (Mongo primary)", () => {
const env = {
KV: makeFakeKv(),
MONGODB_URI: "mongodb://fake",
STORAGE_PRIMARY: "mongo",
DUAL_WRITE: "1",
};
const store = createStore("wordle", env);
expect(store._kind).toBe("dual");
});
it("STORAGE_PRIMARY=mongo, DUAL_WRITE=0, real URI → MongoKVStore only", () => {
const env = {
KV: makeFakeKv(),
MONGODB_URI: "mongodb://fake",
STORAGE_PRIMARY: "mongo",
DUAL_WRITE: "0",
};
const store = createStore("wordle", env);
// MongoKVStore has no _kind; wrapper forwards undefined.
expect(store._kind).toBeUndefined();
});
});
+254
View File
@@ -0,0 +1,254 @@
/**
* @file storage-roundtrip.test.js — e2e storage integration test.
*
* Boots a fake env with:
* - MongoKVStore (backed by fake-mongo) for KV path (wordle)
* - MongoTradesStore (backed by fake-mongo) for SQL/trades path (trading)
* - DUAL_WRITE=1 so DualKVStore is used (CF KV + Mongo both written)
*
* Asserts that state written via the store abstractions is visible in BOTH
* backends — the CF KV fake AND the in-memory Mongo fake.
*
* This test intentionally avoids grammY context setup. It exercises the
* storage layer (the thing being migrated), not the Telegram handler.
*/
import { beforeEach, describe, expect, it } from "vitest";
import { createStore } from "../../src/db/create-store.js";
import { MongoKVStore } from "../../src/db/mongo-kv-store.js";
import { MongoTradesStore } from "../../src/db/mongo-trades-store.js";
import { listTrades, recordTrade } from "../../src/modules/trading/history.js";
import { loadGame, loadStats, recordResult, saveGame } from "../../src/modules/wordle/state.js";
import { makeFakeKv } from "../fakes/fake-kv-namespace.js";
import { makeFakeMongo } from "../fakes/fake-mongo.js";
// ---------------------------------------------------------------------------
// Shared setup
// ---------------------------------------------------------------------------
let cfKv;
let fakeDb;
let env;
beforeEach(() => {
cfKv = makeFakeKv();
fakeDb = makeFakeMongo();
// env with real-looking MONGODB_URI so factories pick the dual-write path.
// MongoKVStore receives dbOverride (fakeDb) so no real Atlas connection happens.
env = {
KV: cfKv,
MONGODB_URI: "mongodb://fake-atlas",
STORAGE_PRIMARY: "kv",
DUAL_WRITE: "1",
};
});
// ---------------------------------------------------------------------------
// KV path — wordle game state
// ---------------------------------------------------------------------------
describe("KV dual-write — wordle game state", () => {
it("saveGame persists to both CF KV and MongoKVStore (fake-mongo)", async () => {
// Build the dual store: createStore uses DualKVStore with CF primary + Mongo secondary.
// Pass fakeDb as dbOverride so MongoKVStore talks to fake-mongo, not Atlas.
const mongoKvStore = new MongoKVStore(env, "wordle", fakeDb);
// Manually construct the dual store to inject fakeDb.
const { DualKVStore } = await import("../../src/db/dual-kv-store.js");
const { CFKVStore } = await import("../../src/db/cf-kv-store.js");
const cfStore = new CFKVStore(cfKv);
// Prefix wrapper that mirrors create-store.js behaviour.
function prefixedStore(base, prefix) {
return {
async get(key) {
return base.get(prefix + key);
},
async put(key, value, opts) {
return base.put(prefix + key, value, opts);
},
async delete(key) {
return base.delete(prefix + key);
},
async list(opts = {}) {
const fullPrefix = prefix + (opts.prefix ?? "");
const result = await base.list({
prefix: fullPrefix,
limit: opts.limit,
cursor: opts.cursor,
});
return {
keys: result.keys.map((k) => (k.startsWith(prefix) ? k.slice(prefix.length) : k)),
cursor: result.cursor,
done: result.done,
};
},
async getJSON(key) {
return base.getJSON(prefix + key);
},
async putJSON(key, value, opts) {
return base.putJSON(prefix + key, value, opts);
},
};
}
const dual = new DualKVStore(cfStore, mongoKvStore, cfKv);
const db = prefixedStore(dual, "wordle:");
const gameState = {
target: "crane",
guesses: [{ word: "audio", results: ["absent", "absent", "absent", "absent", "absent"] }],
solved: false,
startedAt: Date.now(),
};
await saveGame(db, 42, gameState);
// 1. CF KV should have the prefixed key.
expect(cfKv.store.has("wordle:game:42")).toBe(true);
const cfStored = JSON.parse(cfKv.store.get("wordle:game:42"));
expect(cfStored.target).toBe("crane");
// 2. Mongo fake should have the same key.
const mongoColl = fakeDb.collection("wordle");
const mongoDoc = await mongoColl.findOne({ _id: "wordle:game:42" });
expect(mongoDoc).not.toBeNull();
expect(JSON.parse(mongoDoc.value).target).toBe("crane");
// 3. loadGame reads from primary (CF KV).
const loaded = await loadGame(db, 42);
expect(loaded?.target).toBe("crane");
expect(loaded?.guesses).toHaveLength(1);
});
it("recordResult persists stats to both backends", async () => {
const mongoKvStore = new MongoKVStore(env, "wordle", fakeDb);
const { DualKVStore } = await import("../../src/db/dual-kv-store.js");
const { CFKVStore } = await import("../../src/db/cf-kv-store.js");
const cfStore = new CFKVStore(cfKv);
const dual = new DualKVStore(cfStore, mongoKvStore, cfKv);
function prefixedStore(base, prefix) {
return {
async get(key) {
return base.get(prefix + key);
},
async put(key, value, opts) {
return base.put(prefix + key, value, opts);
},
async delete(key) {
return base.delete(prefix + key);
},
async list(opts = {}) {
const fullPrefix = prefix + (opts.prefix ?? "");
const result = await base.list({
prefix: fullPrefix,
limit: opts.limit,
cursor: opts.cursor,
});
return {
keys: result.keys.map((k) => (k.startsWith(prefix) ? k.slice(prefix.length) : k)),
cursor: result.cursor,
done: result.done,
};
},
async getJSON(key) {
return base.getJSON(prefix + key);
},
async putJSON(key, value, opts) {
return base.putJSON(prefix + key, value, opts);
},
};
}
const db = prefixedStore(dual, "wordle:");
await recordResult(db, 99, true);
// CF KV has the stats key.
expect(cfKv.store.has("wordle:stats:99")).toBe(true);
const cfStats = JSON.parse(cfKv.store.get("wordle:stats:99"));
expect(cfStats.wins).toBe(1);
expect(cfStats.streak).toBe(1);
// Mongo fake also has it.
const mongoColl = fakeDb.collection("wordle");
const mongoDoc = await mongoColl.findOne({ _id: "wordle:stats:99" });
expect(mongoDoc).not.toBeNull();
expect(JSON.parse(mongoDoc.value).wins).toBe(1);
// loadStats reads from primary.
const stats = await loadStats(db, 99);
expect(stats.wins).toBe(1);
});
});
// ---------------------------------------------------------------------------
// SQL / trades path — trading insert via MongoTradesStore
// ---------------------------------------------------------------------------
describe("SQL dual-write — trading insert via MongoTradesStore", () => {
it("recordTrade persists via MongoTradesStore (fake-mongo)", async () => {
// Trading uses MongoTradesStore directly (not via DualSqlStore) when tradesStore is provided.
const tradesStore = new MongoTradesStore(env, fakeDb);
await recordTrade(
null,
{
userId: 123,
symbol: "VNM",
side: "buy",
qty: 10,
priceVnd: 50000,
},
tradesStore,
);
// Mongo should have the trade.
const tradeColl = fakeDb.collection("trading_trades");
const docs = await tradeColl.find({}).toArray();
expect(docs).toHaveLength(1);
expect(docs[0].user_id).toBe(123);
expect(docs[0].symbol).toBe("VNM");
expect(docs[0].side).toBe("buy");
expect(docs[0].qty).toBe(10);
expect(docs[0].price_vnd).toBe(50000);
});
it("listTrades reads from MongoTradesStore", async () => {
const tradesStore = new MongoTradesStore(env, fakeDb);
// Insert two trades.
await recordTrade(
null,
{ userId: 7, symbol: "FPT", side: "buy", qty: 5, priceVnd: 100000 },
tradesStore,
);
await recordTrade(
null,
{ userId: 7, symbol: "VIC", side: "sell", qty: 2, priceVnd: 80000 },
tradesStore,
);
const trades = await listTrades(null, 7, 10, tradesStore);
expect(trades).toHaveLength(2);
// Newest first (MongoTradesStore returns sorted by ts desc).
expect(trades.map((t) => t.symbol)).toContain("FPT");
expect(trades.map((t) => t.symbol)).toContain("VIC");
});
it("DUAL_WRITE=1 env flag is present in env passed through registry init", () => {
// Verify the flag matrix expectation: when DUAL_WRITE=1 and MONGODB_URI is real,
// buildTradesStore should return a MongoTradesStore.
const localEnv = {
KV: cfKv,
MONGODB_URI: "mongodb://fake",
STORAGE_PRIMARY: "kv",
DUAL_WRITE: "1",
};
// Direct validation — the helper in registry.js would return a MongoTradesStore.
// We test its outcome indirectly: DUAL_WRITE="1" !== "0" is true.
expect(localEnv.DUAL_WRITE !== "0").toBe(true);
expect(!!localEnv.MONGODB_URI).toBe(true);
expect(localEnv.MONGODB_URI).not.toBe("__stub_mongo__");
});
});
+11 -1
View File
@@ -10,6 +10,13 @@ compatibility_flags = ["nodejs_compat_v2"]
# Also duplicate this value into .env.deploy so scripts/register.js derives the same public command list.
[vars]
MODULES = "util,wordle,loldle,loldle-emoji,loldle-quote,loldle-ability,loldle-splash,misc,trading,lolschedule,semantle,doantu,twentyq"
# Storage routing flags — control which backend serves reads and receives writes.
# STORAGE_PRIMARY: "kv" = CF KV (default), "mongo" = Atlas (post-cutover).
# DUAL_WRITE: "1" = write both backends in parallel (default), "0" = primary only.
# DRIFT_SAMPLE_N: number of keys sampled per module by drift-verifier cron.
STORAGE_PRIMARY = "kv"
DUAL_WRITE = "1"
DRIFT_SAMPLE_N = "50"
# KV namespace holding all module state. Each module auto-prefixes its keys via createStore().
# Production-only — no preview namespace. Create with:
@@ -44,7 +51,10 @@ binding = "AI"
# Local testing: curl "http://localhost:8787/__scheduled?cron=0+1+*+*+*"
# (requires `wrangler dev --test-scheduled`)
[triggers]
crons = ["0 17 * * *", "0 1 * * *"]
# "0 17 * * *" — trading trim-trades (lolschedule module)
# "0 1 * * *" — misc / lolschedule nightly jobs
# "0 * * * *" — drift-verifier: drain retry queue + spot-check KV vs Mongo parity
crons = ["0 17 * * *", "0 1 * * *", "0 * * * *"]
# Workers Observability — captures console.* logs, request metadata, and
# invocation traces in the Cloudflare dashboard (Observability → Logs).