mirror of
https://github.com/tiennm99/miti99bot-js.git
synced 2026-05-23 02:25:41 +00:00
0859356ec7
Operator-run migration scripts for KV→Mongo and D1→trading_trades, plus a parity verifier and a rollback wiper. Pure local Node — no Worker code, no /__admin/* routes, no new Worker secrets. Complies with docs/architecture.md §10. Scripts - backfill-kv-to-mongo.js: paginates CF KV REST API per module, fetches values, $setOnInsert upsert into per-module Mongo collection. Resumes from .backfill-cursor-<module>.json on restart. Throttles 50 ops/sec. expiresAt derived from KV metadata.expiration (debugger #10). --dry-run and --module flags for incremental work. - backfill-d1-to-mongo.js: wrangler d1 execute --remote --json → parse → insertMany batches into trading_trades, preserving original integer id as legacy_id (code-reviewer #13). Pre-flight aborts if collection non-empty unless --force. - verify-mongo-parity.js: count parity ±1%, SHA256 value compare, expiresAt ±5min bucket. Full-scan when <10K docs, sqrt-sample capped at 500 otherwise (code-reviewer #21). Trading: full-scan on legacy_id/ts/user_id/symbol/qty. - wipe-mongo.js: rollback helper. deleteMany across all collections with readline confirm. --yes for CI. - lib/migration-helpers.js: shared sleep, sha256, checkpoint I/O, cfKvList/cfKvGet, MongoClient singleton, sample strategy. Surface updates - .env.deploy.example: CF account/token/namespace placeholders. - package.json: backfill:kv[:dry], backfill:d1[:dry], verify:mongo, wipe:mongo scripts. - check-secret-leaks.js: SECRETS array gains CLOUDFLARE_API_TOKEN + CLOUDFLARE_ACCOUNT_ID for defense-in-depth. - .gitignore: .backfill-cursor-*.json excluded. Tests: 638 → 667 (+29 pure-logic tests for sha256, checkpoint round-trip, count-diff, sample-size, fetch-mocked CF REST). Lint clean. Operator-run sequence (after Phase 06 deploy): npm run backfill:kv:dry # preview npm run backfill:kv npm run backfill:d1:dry npm run backfill:d1 npm run verify:mongo # exit 0 = parity ok
150 lines
5.7 KiB
JavaScript
150 lines
5.7 KiB
JavaScript
#!/usr/bin/env node
|
|
/**
|
|
* @file backfill-d1-to-mongo — copy trading_trades from D1 into MongoDB Atlas.
|
|
*
|
|
* Uses `npx wrangler d1 execute --remote --json` (mirrors scripts/migrate.js pattern)
|
|
* to read all rows, then insertMany in batches of 100 into the `trading_trades`
|
|
* Mongo collection.
|
|
*
|
|
* Flags:
|
|
* --dry-run Count rows + log first 5 without writing.
|
|
* --force Bypass pre-flight abort when trading_trades collection is non-empty.
|
|
*
|
|
* Required env (loaded via --env-file-if-exists=.env.deploy):
|
|
* MONGODB_URI
|
|
*
|
|
* Usage:
|
|
* node --env-file-if-exists=.env.deploy scripts/backfill-d1-to-mongo.js
|
|
* node --env-file-if-exists=.env.deploy scripts/backfill-d1-to-mongo.js --dry-run
|
|
* node --env-file-if-exists=.env.deploy scripts/backfill-d1-to-mongo.js --force
|
|
*/
|
|
|
|
import { execSync } from "node:child_process";
|
|
import { ObjectId } from "mongodb";
|
|
import { closeMongoClient, getMongoClient, sleep } from "./lib/migration-helpers.js";
|
|
|
|
const DB_NAME = "miti99bot-db";
|
|
const COLLECTION = "trading_trades";
|
|
const BATCH_SIZE = 100;
|
|
const BATCH_SLEEP_MS = 100; // brief pause between batches
|
|
|
|
const dryRun = process.argv.includes("--dry-run");
|
|
const force = process.argv.includes("--force");
|
|
|
|
// ─── Preflight ────────────────────────────────────────────────────────────────
|
|
|
|
function validateEnv() {
|
|
const missingVars = [["MONGODB_URI", "Atlas connection string"]].filter(([k]) => !process.env[k]);
|
|
if (missingVars.length) {
|
|
for (const [k, desc] of missingVars) console.error(`[backfill-d1] Missing: ${k} (${desc})`);
|
|
console.error(" Copy .env.deploy.example to .env.deploy and fill in values.");
|
|
process.exit(1);
|
|
}
|
|
}
|
|
|
|
// ─── D1 query ─────────────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Execute a SQL query against the remote D1 database via wrangler.
|
|
* Mirrors the wranglerExecute pattern in scripts/migrate.js.
|
|
*
|
|
* @param {string} sql
|
|
* @returns {any[]} rows from D1
|
|
*/
|
|
function queryD1(sql) {
|
|
const cmd = `npx wrangler d1 execute ${DB_NAME} --remote --command "${sql.replace(/"/g, '\\"')}" --json`;
|
|
let stdout;
|
|
try {
|
|
stdout = execSync(cmd, { stdio: ["ignore", "pipe", "pipe"] }).toString();
|
|
} catch (err) {
|
|
const stderr = err.stderr?.toString() ?? "";
|
|
throw new Error(`wrangler d1 execute failed:\n${stderr || err.stdout?.toString()}`);
|
|
}
|
|
// wrangler --json wraps results: [{results: [...], success: bool}]
|
|
const parsed = JSON.parse(stdout);
|
|
const results = Array.isArray(parsed) ? (parsed[0]?.results ?? []) : [];
|
|
return results;
|
|
}
|
|
|
|
// ─── Row mapping ──────────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Map a D1 row to a Mongo document.
|
|
* Preserves legacy_id so round-trip verification can match records.
|
|
*
|
|
* @param {{ id: number, user_id: string, symbol: string, side: string,
|
|
* qty: number, price_vnd: number, ts: number }} row
|
|
* @returns {object}
|
|
*/
|
|
function rowToDoc(row) {
|
|
return {
|
|
_id: new ObjectId(),
|
|
legacy_id: row.id,
|
|
user_id: row.user_id,
|
|
symbol: row.symbol,
|
|
side: row.side,
|
|
qty: row.qty,
|
|
price_vnd: row.price_vnd,
|
|
ts: row.ts,
|
|
};
|
|
}
|
|
|
|
// ─── Main ─────────────────────────────────────────────────────────────────────
|
|
|
|
async function main() {
|
|
validateEnv();
|
|
|
|
if (dryRun) console.log("[backfill-d1] DRY RUN — no writes will be made");
|
|
|
|
// Fetch all rows from D1.
|
|
console.log("[backfill-d1] Querying D1 trading_trades...");
|
|
const rows = queryD1(
|
|
"SELECT id, user_id, symbol, side, qty, price_vnd, ts FROM trading_trades ORDER BY id",
|
|
);
|
|
console.log(`[backfill-d1] Found ${rows.length} rows in D1`);
|
|
|
|
if (dryRun) {
|
|
console.log("[backfill-d1] Sample (first 5 rows):");
|
|
for (const row of rows.slice(0, 5)) {
|
|
// Log structural info only — no PII values printed.
|
|
console.log(` id=${row.id} symbol=${row.symbol} side=${row.side} ts=${row.ts}`);
|
|
}
|
|
console.log("[backfill-d1] DRY RUN complete. No writes made.");
|
|
return;
|
|
}
|
|
|
|
const client = await getMongoClient(process.env.MONGODB_URI);
|
|
const db = client.db();
|
|
const coll = db.collection(COLLECTION);
|
|
|
|
// Pre-flight: abort if collection already has data (prevents double-insert).
|
|
const existingCount = await coll.countDocuments();
|
|
if (existingCount > 0 && !force) {
|
|
console.error(`[backfill-d1] ABORT: ${COLLECTION} already has ${existingCount} document(s).`);
|
|
console.error(" Run with --force to bypass this check (e.g. after wipe-mongo).");
|
|
await closeMongoClient();
|
|
process.exit(1);
|
|
}
|
|
if (existingCount > 0 && force) {
|
|
console.log(`[backfill-d1] --force: proceeding despite ${existingCount} existing doc(s).`);
|
|
}
|
|
|
|
// Insert in batches of BATCH_SIZE.
|
|
let inserted = 0;
|
|
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
|
|
const batch = rows.slice(i, i + BATCH_SIZE).map(rowToDoc);
|
|
await coll.insertMany(batch);
|
|
inserted += batch.length;
|
|
console.log(`[backfill-d1] Inserted ${inserted}/${rows.length}`);
|
|
if (i + BATCH_SIZE < rows.length) await sleep(BATCH_SLEEP_MS);
|
|
}
|
|
|
|
await closeMongoClient();
|
|
console.log(`[backfill-d1] Done — ${inserted} documents inserted into ${COLLECTION}.`);
|
|
}
|
|
|
|
main().catch((err) => {
|
|
console.error("[backfill-d1] Fatal:", err.message ?? err);
|
|
process.exit(1);
|
|
});
|