Files
tiennm99 3f03521e84 feat(scripts): phase 07 — reverse-backfill scripts + delete guard
Pre-execution prerequisites for the Phase 07 cutover. Stage 2 of the
cutover keeps DUAL_WRITE=0 for ~6 days; if anything regresses during
that window the operator MUST be able to roll back to KV/D1 with the
last N days of Mongo-only writes recovered. Pre-building these scripts
(per code-reviewer #4) eliminates "draft a backfill under outage
pressure" — the anti-pattern of writing untested code at 4am.

Reverse-backfill
- scripts/backfill-mongo-to-kv.js: full-scan Mongo collection per module,
  PUT each doc back to CF KV via REST. expiresAt → expirationTtl (clamped
  to 60s minimum per CF KV); already-expired docs are skipped (won't
  resurrect dead state). 50 ops/sec throttle. --dry-run + --module flags.
- scripts/backfill-mongo-to-d1.js: full-scan trading_trades, build INSERT
  SQL preserving legacy_id where present (round-trips D1 autoincrement IDs
  preserved by phase-05 forward backfill). Sequential int generation for
  any docs without legacy_id. Pipes through wrangler d1 execute.
- scripts/lib/migration-helpers.js: cfKvPut helper added.

Delete guard (debugger #12)
- scripts/wrangler-delete-guard.sh: interactive CONFIRM wrapper around
  wrangler kv namespace delete + wrangler d1 delete. Exits 3 when stdin
  is not a tty so it cannot run in CI. Documented: never run in CI.

package.json: backfill:mongo:kv[:dry] + backfill:mongo:d1[:dry] scripts
wired.

Tests: 697 → 733 (+36).
- 7 cfKvPut tests (REST URL, querystring, body, expiration_ttl param).
- 10 reverse-KV TTL math tests (expired sentinel, future seconds, no-TTL,
  CF 60s minimum clamp).
- 9 reverse-D1 SQL construction tests (escaping, legacy_id preservation,
  sequential generation).

Lint clean. No Worker code touched. Stage 1 cutover, 7-day soak,
snapshots, and Stage 3 cleanup (delete CFKVStore + simplify factories +
edit package.json deploy chain) remain operator-driven and will be
committed separately after binding deletion.
2026-04-26 09:29:14 +07:00

230 lines
7.2 KiB
JavaScript

#!/usr/bin/env node
/**
* @file migration-helpers — shared utilities for Phase 05 backfill scripts.
*
* Exports:
* - sleep, sha256
* - loadCheckpoint, saveCheckpoint, clearCheckpoint
* - cfKvList, cfKvGet
* - getMongoClient, closeMongoClient
* - countDiffRatio, sampleStrategy
*/
import { createHash } from "node:crypto";
import { existsSync, readFileSync, unlinkSync, writeFileSync } from "node:fs";
import { resolve } from "node:path";
import { MongoClient } from "mongodb";
const CF_API_BASE = "https://api.cloudflare.com/client/v4";
// ─── Primitives ───────────────────────────────────────────────────────────────
/** @param {number} ms @returns {Promise<void>} */
export const sleep = (ms) => new Promise((res) => setTimeout(res, ms));
/** @param {string} text @returns {string} hex SHA-256 */
export const sha256 = (text) => createHash("sha256").update(text, "utf8").digest("hex");
// ─── Checkpoint ───────────────────────────────────────────────────────────────
const cpPath = (mod) => resolve(process.cwd(), `.backfill-cursor-${mod}.json`);
/**
* Load saved cursor state for a module. Returns null if none.
*
* @param {string} moduleName
* @returns {{ cursor: string|null, lastKey: string|null }|null}
*/
export function loadCheckpoint(moduleName) {
const p = cpPath(moduleName);
if (!existsSync(p)) return null;
try {
return JSON.parse(readFileSync(p, "utf8"));
} catch {
return null;
}
}
/**
* Persist cursor state so a crash can resume from this page.
*
* @param {string} moduleName
* @param {{ cursor: string|null, lastKey: string|null }} state
*/
export function saveCheckpoint(moduleName, state) {
writeFileSync(cpPath(moduleName), JSON.stringify(state), "utf8");
}
/**
* Remove checkpoint file on successful module completion.
*
* @param {string} moduleName
*/
export function clearCheckpoint(moduleName) {
const p = cpPath(moduleName);
if (existsSync(p)) {
try {
unlinkSync(p);
} catch {
// Non-fatal — may already be gone.
}
}
}
// ─── CF KV REST ───────────────────────────────────────────────────────────────
/**
* List one page of KV keys for a given prefix.
*
* @param {string} accountId
* @param {string} nsId
* @param {string} token
* @param {string} prefix
* @param {string|null} cursor
* @returns {Promise<{
* keys: Array<{name: string, metadata?: {expiration?: number}}>,
* cursor: string|null,
* list_complete: boolean
* }>}
*/
export async function cfKvList(accountId, nsId, token, prefix, cursor) {
const url = new URL(`${CF_API_BASE}/accounts/${accountId}/storage/kv/namespaces/${nsId}/keys`);
url.searchParams.set("prefix", prefix);
url.searchParams.set("limit", "1000");
if (cursor) url.searchParams.set("cursor", cursor);
const res = await fetch(url.toString(), {
headers: { Authorization: `Bearer ${token}`, "Content-Type": "application/json" },
});
if (!res.ok) {
const body = await res.text().catch(() => "");
throw new Error(`CF KV list ${res.status}: ${body}`);
}
const json = await res.json();
if (!json.success) throw new Error(`CF KV list error: ${JSON.stringify(json.errors ?? json)}`);
return {
keys: json.result ?? [],
cursor: json.result_info?.cursor ?? null,
list_complete: !json.result_info?.cursor,
};
}
/**
* Fetch the string value for a single KV key.
*
* @param {string} accountId
* @param {string} nsId
* @param {string} token
* @param {string} key
* @returns {Promise<string>}
*/
export async function cfKvGet(accountId, nsId, token, key) {
const url = `${CF_API_BASE}/accounts/${accountId}/storage/kv/namespaces/${nsId}/values/${encodeURIComponent(key)}`;
const res = await fetch(url, { headers: { Authorization: `Bearer ${token}` } });
if (!res.ok) {
const body = await res.text().catch(() => "");
throw new Error(`CF KV get "${key}" ${res.status}: ${body}`);
}
return res.text();
}
/**
* Write a string value into CF KV via the REST API.
*
* CF KV REST PUT: PUT /accounts/{id}/storage/kv/namespaces/{nsid}/values/{key}
* Optional query param `expiration_ttl` (seconds from now, minimum 60).
*
* @param {string} accountId
* @param {string} nsId
* @param {string} token
* @param {string} key
* @param {string} value
* @param {{ expirationTtl?: number }} [opts]
* @returns {Promise<void>}
*/
export async function cfKvPut(accountId, nsId, token, key, value, opts = {}) {
const url = new URL(
`${CF_API_BASE}/accounts/${accountId}/storage/kv/namespaces/${nsId}/values/${encodeURIComponent(key)}`,
);
if (opts.expirationTtl != null) {
url.searchParams.set("expiration_ttl", String(opts.expirationTtl));
}
const res = await fetch(url.toString(), {
method: "PUT",
headers: {
Authorization: `Bearer ${token}`,
"Content-Type": "text/plain",
},
body: value,
});
if (!res.ok) {
const body = await res.text().catch(() => "");
throw new Error(`CF KV put "${key}" ${res.status}: ${body}`);
}
}
// ─── MongoDB singleton ────────────────────────────────────────────────────────
/** @type {MongoClient|null} */
let _client = null;
/**
* Return a shared MongoClient, connecting on first call.
* Registers process exit / signal handlers to close cleanly.
*
* @param {string} uri
* @returns {Promise<MongoClient>}
*/
export async function getMongoClient(uri) {
if (!uri) throw new Error("MongoDB URI is required — set it in .env.deploy and re-run");
if (_client) return _client;
const client = new MongoClient(uri);
await client.connect();
_client = client;
const close = () => {
_client?.close().catch(() => {});
_client = null;
};
process.once("exit", close);
process.once("SIGINT", () => {
close();
process.exit(0);
});
process.once("SIGTERM", () => {
close();
process.exit(0);
});
return _client;
}
/** Close the shared client (test teardown / forced close). */
export async function closeMongoClient() {
if (_client) {
await _client.close();
_client = null;
}
}
// ─── Parity math ─────────────────────────────────────────────────────────────
/**
* Relative difference between two counts — 0 means identical.
*
* @param {number} a
* @param {number} b
* @returns {number} value in [0, 1]
*/
export const countDiffRatio = (a, b) => Math.abs(a - b) / Math.max(a, b, 1);
/**
* Determine verify strategy: full-scan when total < 10 000,
* otherwise random-sample N = min(500, ceil(sqrt(total))).
*
* @param {number} total
* @returns {{ fullScan: boolean, sampleSize: number }}
*/
export function sampleStrategy(total) {
if (total < 10000) return { fullScan: true, sampleSize: total };
return { fullScan: false, sampleSize: Math.min(500, Math.ceil(Math.sqrt(total))) };
}