mirror of
https://github.com/tiennm99/miti99bot-js.git
synced 2026-05-16 07:53:19 +00:00
0859356ec7
Operator-run migration scripts for KV→Mongo and D1→trading_trades, plus a parity verifier and a rollback wiper. Pure local Node — no Worker code, no /__admin/* routes, no new Worker secrets. Complies with docs/architecture.md §10. Scripts - backfill-kv-to-mongo.js: paginates CF KV REST API per module, fetches values, $setOnInsert upsert into per-module Mongo collection. Resumes from .backfill-cursor-<module>.json on restart. Throttles 50 ops/sec. expiresAt derived from KV metadata.expiration (debugger #10). --dry-run and --module flags for incremental work. - backfill-d1-to-mongo.js: wrangler d1 execute --remote --json → parse → insertMany batches into trading_trades, preserving original integer id as legacy_id (code-reviewer #13). Pre-flight aborts if collection non-empty unless --force. - verify-mongo-parity.js: count parity ±1%, SHA256 value compare, expiresAt ±5min bucket. Full-scan when <10K docs, sqrt-sample capped at 500 otherwise (code-reviewer #21). Trading: full-scan on legacy_id/ts/user_id/symbol/qty. - wipe-mongo.js: rollback helper. deleteMany across all collections with readline confirm. --yes for CI. - lib/migration-helpers.js: shared sleep, sha256, checkpoint I/O, cfKvList/cfKvGet, MongoClient singleton, sample strategy. Surface updates - .env.deploy.example: CF account/token/namespace placeholders. - package.json: backfill:kv[:dry], backfill:d1[:dry], verify:mongo, wipe:mongo scripts. - check-secret-leaks.js: SECRETS array gains CLOUDFLARE_API_TOKEN + CLOUDFLARE_ACCOUNT_ID for defense-in-depth. - .gitignore: .backfill-cursor-*.json excluded. Tests: 638 → 667 (+29 pure-logic tests for sha256, checkpoint round-trip, count-diff, sample-size, fetch-mocked CF REST). Lint clean. Operator-run sequence (after Phase 06 deploy): npm run backfill:kv:dry # preview npm run backfill:kv npm run backfill:d1:dry npm run backfill:d1 npm run verify:mongo # exit 0 = parity ok
216 lines
7.0 KiB
JavaScript
216 lines
7.0 KiB
JavaScript
#!/usr/bin/env node
|
|
/**
|
|
* @file verify-mongo-parity — cross-check KV / D1 data against MongoDB Atlas.
|
|
*
|
|
* Per KV module: count via CF REST vs Mongo countDocuments (±1% tolerance),
|
|
* then value-compare via SHA256. Full-scan when total < 10 000; random-sample
|
|
* N = min(500, ceil(sqrt(N))) otherwise. expiresAt checked within ±5 min.
|
|
* For trading_trades: full-scan on legacy_id, ts, user_id, symbol, qty.
|
|
*
|
|
* Flags: --dry-run (counts only, no value compare)
|
|
* Required env: MONGODB_URI, CLOUDFLARE_ACCOUNT_ID, CLOUDFLARE_API_TOKEN,
|
|
* KV_NAMESPACE_ID, MODULES
|
|
* Exit: 0 all-pass | 1 any failure
|
|
*/
|
|
|
|
import { execSync } from "node:child_process";
|
|
import {
|
|
cfKvGet,
|
|
cfKvList,
|
|
closeMongoClient,
|
|
countDiffRatio,
|
|
getMongoClient,
|
|
sampleStrategy,
|
|
sha256,
|
|
sleep,
|
|
} from "./lib/migration-helpers.js";
|
|
|
|
const DB_NAME = "miti99bot-db";
|
|
const EXPIRES_TOL_MS = 5 * 60 * 1000; // ±5 minutes
|
|
const dryRun = process.argv.includes("--dry-run");
|
|
const {
|
|
MONGODB_URI,
|
|
CLOUDFLARE_ACCOUNT_ID,
|
|
CLOUDFLARE_API_TOKEN,
|
|
KV_NAMESPACE_ID,
|
|
MODULES: MODULES_ENV,
|
|
} = process.env;
|
|
const toCollName = (mod) => mod.replace(/-/g, "_");
|
|
const redactKey = (k) => {
|
|
const c = k.indexOf(":");
|
|
return `${c >= 0 ? k.slice(0, c + 1) : ""}sha256:${sha256(k).slice(0, 8)}…`;
|
|
};
|
|
|
|
function validateEnv() {
|
|
const missing = [
|
|
"MONGODB_URI",
|
|
"CLOUDFLARE_ACCOUNT_ID",
|
|
"CLOUDFLARE_API_TOKEN",
|
|
"KV_NAMESPACE_ID",
|
|
"MODULES",
|
|
].filter((k) => !process.env[k]);
|
|
if (missing.length) {
|
|
console.error(`[verify] Missing env vars: ${missing.join(", ")}`);
|
|
process.exit(1);
|
|
}
|
|
}
|
|
|
|
function queryD1(sql) {
|
|
const cmd = `npx wrangler d1 execute ${DB_NAME} --remote --command "${sql.replace(/"/g, '\\"')}" --json`;
|
|
const parsed = JSON.parse(execSync(cmd, { stdio: ["ignore", "pipe", "pipe"] }).toString());
|
|
return Array.isArray(parsed) ? (parsed[0]?.results ?? []) : [];
|
|
}
|
|
|
|
function shuffle(arr) {
|
|
const a = [...arr];
|
|
for (let i = a.length - 1; i > 0; i--) {
|
|
const j = Math.floor(Math.random() * (i + 1));
|
|
[a[i], a[j]] = [a[j], a[i]];
|
|
}
|
|
return a;
|
|
}
|
|
|
|
/** Enumerate ALL key objects for a module prefix via paginated CF REST. */
|
|
async function allKvKeys(mod) {
|
|
const keys = [];
|
|
let cursor = null;
|
|
do {
|
|
const page = await cfKvList(
|
|
CLOUDFLARE_ACCOUNT_ID,
|
|
KV_NAMESPACE_ID,
|
|
CLOUDFLARE_API_TOKEN,
|
|
`${mod}:`,
|
|
cursor,
|
|
);
|
|
keys.push(...page.keys);
|
|
cursor = page.list_complete ? null : page.cursor;
|
|
if (cursor) await sleep(250);
|
|
} while (cursor);
|
|
return keys;
|
|
}
|
|
|
|
/**
|
|
* Compare one KV key against its Mongo doc.
|
|
* Returns null on match, mismatch description string on failure.
|
|
*
|
|
* @param {import("mongodb").Collection} coll
|
|
* @param {{ name: string, metadata?: { expiration?: number } }} keyObj
|
|
* @returns {Promise<string|null>}
|
|
*/
|
|
async function compareKey(coll, keyObj) {
|
|
const value = await cfKvGet(
|
|
CLOUDFLARE_ACCOUNT_ID,
|
|
KV_NAMESPACE_ID,
|
|
CLOUDFLARE_API_TOKEN,
|
|
keyObj.name,
|
|
);
|
|
await sleep(5);
|
|
const doc = await coll.findOne({ _id: keyObj.name });
|
|
if (!doc) return "missing in Mongo";
|
|
if (sha256(value) !== sha256(doc.value ?? "")) return "value hash mismatch";
|
|
const kvExpMs = keyObj.metadata?.expiration ? keyObj.metadata.expiration * 1000 : null;
|
|
const mgExpMs = doc.expiresAt ? new Date(doc.expiresAt).getTime() : null;
|
|
if ((kvExpMs === null) !== (mgExpMs === null)) return "expiresAt presence mismatch";
|
|
if (kvExpMs !== null && Math.abs(kvExpMs - mgExpMs) > EXPIRES_TOL_MS)
|
|
return `expiresAt diff ${Math.abs(kvExpMs - mgExpMs)}ms`;
|
|
return null;
|
|
}
|
|
|
|
async function verifyModule(db, mod) {
|
|
const coll = db.collection(toCollName(mod));
|
|
const kvKeys = await allKvKeys(mod);
|
|
const nKv = kvKeys.length;
|
|
const nMongo = await coll.countDocuments();
|
|
const ratio = countDiffRatio(nKv, nMongo);
|
|
const countOk = ratio < 0.01;
|
|
console.log(
|
|
`[${mod}] KV=${nKv} Mongo=${nMongo} diff=${(ratio * 100).toFixed(2)}% ${countOk ? "OK" : "FAIL"}`,
|
|
);
|
|
if (!countOk)
|
|
return { pass: false, mismatches: [`count diff ${(ratio * 100).toFixed(2)}% > 1%`] };
|
|
if (dryRun) return { pass: true, mismatches: [] };
|
|
|
|
const { fullScan, sampleSize } = sampleStrategy(nKv);
|
|
const sample = fullScan ? kvKeys : shuffle(kvKeys).slice(0, sampleSize);
|
|
console.log(`[${mod}] ${fullScan ? "full-scan" : `sample ${sampleSize}/${nKv}`}`);
|
|
|
|
const mismatches = [];
|
|
for (const keyObj of sample) {
|
|
const m = await compareKey(coll, keyObj);
|
|
if (m) mismatches.push(`${redactKey(keyObj.name)}: ${m}`);
|
|
}
|
|
console.log(
|
|
`[${mod}] value compare: ${mismatches.length === 0 ? "PASS" : `${mismatches.length} mismatch(es)`}`,
|
|
);
|
|
return { pass: mismatches.length === 0, mismatches };
|
|
}
|
|
|
|
async function verifyTrading(db) {
|
|
const coll = db.collection("trading_trades");
|
|
const rows = queryD1(
|
|
"SELECT id, user_id, symbol, side, qty, price_vnd, ts FROM trading_trades ORDER BY id",
|
|
);
|
|
const nMongo = await coll.countDocuments();
|
|
const ratio = countDiffRatio(rows.length, nMongo);
|
|
const countOk = ratio < 0.01;
|
|
console.log(
|
|
`[trading] D1=${rows.length} Mongo=${nMongo} diff=${(ratio * 100).toFixed(2)}% ${countOk ? "OK" : "FAIL"}`,
|
|
);
|
|
if (!countOk) return { pass: false, mismatches: [`count diff ${(ratio * 100).toFixed(2)}%`] };
|
|
if (dryRun) return { pass: true, mismatches: [] };
|
|
|
|
const mismatches = [];
|
|
for (const row of rows) {
|
|
const doc = await coll.findOne({ legacy_id: row.id });
|
|
if (!doc) {
|
|
mismatches.push(`legacy_id=${row.id} missing`);
|
|
continue;
|
|
}
|
|
for (const f of ["user_id", "symbol", "qty", "ts"]) {
|
|
if (String(doc[f]) !== String(row[f])) mismatches.push(`legacy_id=${row.id} ${f} mismatch`);
|
|
}
|
|
}
|
|
console.log(
|
|
`[trading] full-scan: ${mismatches.length === 0 ? "PASS" : `${mismatches.length} mismatch(es)`}`,
|
|
);
|
|
return { pass: mismatches.length === 0, mismatches };
|
|
}
|
|
|
|
async function main() {
|
|
validateEnv();
|
|
if (dryRun) console.log("[verify] DRY RUN — counts only");
|
|
const modules = MODULES_ENV.split(",")
|
|
.map((m) => m.trim())
|
|
.filter(Boolean);
|
|
const db = (await getMongoClient(MONGODB_URI)).db();
|
|
|
|
const report = [];
|
|
let anyFail = false;
|
|
for (const mod of modules) {
|
|
const r = await verifyModule(db, mod);
|
|
report.push({ mod, ...r });
|
|
if (!r.pass) anyFail = true;
|
|
}
|
|
const tr = await verifyTrading(db);
|
|
report.push({ mod: "trading", ...tr });
|
|
if (!tr.pass) anyFail = true;
|
|
|
|
await closeMongoClient();
|
|
console.log("\n── Parity Report ─────────────────────────────");
|
|
for (const r of report) {
|
|
console.log(` ${r.pass ? "PASS" : "FAIL"} ${r.mod}`);
|
|
for (const m of r.mismatches) console.log(` mismatch: ${m}`);
|
|
}
|
|
console.log("──────────────────────────────────────────────");
|
|
if (anyFail) {
|
|
console.error("[verify] FAILED.");
|
|
process.exit(1);
|
|
}
|
|
console.log("[verify] All modules PASS.");
|
|
}
|
|
|
|
main().catch((err) => {
|
|
console.error("[verify] Fatal:", err.message ?? err);
|
|
process.exit(1);
|
|
});
|