feat: add D1 storage layer with per-module migration runner

- SqlStore interface + CF D1 wrapper + per-module factory (table prefix convention)
- init signature extended to ({ db, sql, env }); sql is null when DB binding absent
- custom migration runner walks src/modules/*/migrations/*.sql, tracks applied in _migrations table
- npm run db:migrate with --dry-run and --local flags; chained into deploy
- fake-d1 test helper with subset of SQL semantics for retention and history tests
This commit is contained in:
2026-04-15 13:21:53 +07:00
parent fb8c7518f7
commit 83c6892d6e
15 changed files with 1879 additions and 15 deletions

View File

@@ -9,12 +9,29 @@
import { Bot } from "grammy";
import { installDispatcher } from "./modules/dispatcher.js";
import { getCurrentRegistry } from "./modules/registry.js";
/** @type {Bot | null} */
let botInstance = null;
/** @type {Promise<Bot> | null} */
let botInitPromise = null;
/**
* Returns the memoized registry, building it (and the bot) if needed.
* Shares the same instance used by the fetch handler so scheduled() and
* fetch() operate on identical registry state within a warm instance.
*
* @param {any} env
* @returns {Promise<import("./modules/registry.js").Registry>}
*/
export async function getRegistry(env) {
// If the bot is already initialised the registry was built as a side effect.
if (botInstance) return getCurrentRegistry();
// Otherwise bootstrap via getBot (which calls buildRegistry internally).
await getBot(env);
return getCurrentRegistry();
}
/**
* Fail fast if any required env var is missing — better a 500 on first webhook
* than a confusing runtime error inside grammY.

80
src/db/cf-sql-store.js Normal file
View File

@@ -0,0 +1,80 @@
/**
* @file cf-sql-store — thin wrapper around a Cloudflare D1 database binding.
*
* Exposes `prepare`, `run`, `all`, `first`, and `batch` using the D1
* prepared-statement API. This is the production implementation of SqlStore.
* Tests use `fake-d1.js` instead.
*/
/**
* @typedef {import("./sql-store-interface.js").SqlStore} SqlStore
* @typedef {import("./sql-store-interface.js").SqlRunResult} SqlRunResult
*/
export class CFSqlStore {
/** @param {D1Database} db */
constructor(db) {
this._db = db;
}
/**
* Returns a bound D1PreparedStatement for advanced use (e.g. batch()).
*
* @param {string} query
* @param {...any} binds
* @returns {D1PreparedStatement}
*/
prepare(query, ...binds) {
const stmt = this._db.prepare(query);
return binds.length > 0 ? stmt.bind(...binds) : stmt;
}
/**
* Execute a write statement (INSERT/UPDATE/DELETE/CREATE).
*
* @param {string} query
* @param {...any} binds
* @returns {Promise<SqlRunResult>}
*/
async run(query, ...binds) {
const result = await this.prepare(query, ...binds).run();
return {
changes: result.meta?.changes ?? 0,
last_row_id: result.meta?.last_row_id ?? 0,
};
}
/**
* Execute a SELECT and return all matching rows.
*
* @param {string} query
* @param {...any} binds
* @returns {Promise<any[]>}
*/
async all(query, ...binds) {
const result = await this.prepare(query, ...binds).all();
return result.results ?? [];
}
/**
* Execute a SELECT and return the first row, or null.
*
* @param {string} query
* @param {...any} binds
* @returns {Promise<any|null>}
*/
async first(query, ...binds) {
return this.prepare(query, ...binds).first() ?? null;
}
/**
* Execute multiple prepared statements in a single round-trip.
*
* @param {D1PreparedStatement[]} statements
* @returns {Promise<any[]>}
*/
async batch(statements) {
const results = await this._db.batch(statements);
return results.map((r) => r.results ?? []);
}
}

View File

@@ -0,0 +1,64 @@
/**
* @file create-sql-store — factory returning a namespaced SqlStore for a module.
*
* Table naming is by convention: `{moduleName}_{table}`. Authors write the
* full prefixed name directly in SQL (e.g. `trading_trades`). `tablePrefix`
* is exposed for authors who want to interpolate the prefix dynamically.
*
* Returns null when `env.DB` is absent so modules that don't use D1 have
* zero overhead — the registry passes `sql: null` and modules check for it.
*/
import { CFSqlStore } from "./cf-sql-store.js";
/**
* @typedef {import("./sql-store-interface.js").SqlStore} SqlStore
*/
const MODULE_NAME_RE = /^[a-z0-9_-]+$/;
/**
* @param {string} moduleName — must match `[a-z0-9_-]+`.
* @param {{ DB?: D1Database }} env — worker env (or test double).
* @returns {SqlStore | null} null when env.DB is not bound.
*/
export function createSqlStore(moduleName, env) {
if (!moduleName || typeof moduleName !== "string") {
throw new Error("createSqlStore: moduleName is required");
}
if (!MODULE_NAME_RE.test(moduleName)) {
throw new Error(
`createSqlStore: invalid moduleName "${moduleName}" — must match ${MODULE_NAME_RE}`,
);
}
// D1 is optional — workers without a DB binding still work fine.
if (!env?.DB) return null;
const base = new CFSqlStore(env.DB);
const tablePrefix = `${moduleName}_`;
return {
tablePrefix,
prepare(query, ...binds) {
return base.prepare(query, ...binds);
},
async run(query, ...binds) {
return base.run(query, ...binds);
},
async all(query, ...binds) {
return base.all(query, ...binds);
},
async first(query, ...binds) {
return base.first(query, ...binds);
},
async batch(statements) {
return base.batch(statements);
},
};
}

View File

@@ -10,26 +10,26 @@
*/
/**
* @typedef {Object} KVStorePutOptions
* @typedef {object} KVStorePutOptions
* @property {number} [expirationTtl] seconds — value auto-deletes after this many seconds.
*/
/**
* @typedef {Object} KVStoreListOptions
* @typedef {object} KVStoreListOptions
* @property {string} [prefix] additional prefix (appended AFTER the module namespace).
* @property {number} [limit]
* @property {string} [cursor] pagination cursor from a previous list() call.
*/
/**
* @typedef {Object} KVStoreListResult
* @typedef {object} KVStoreListResult
* @property {string[]} keys — module namespace already stripped.
* @property {string} [cursor] — present if more pages available.
* @property {boolean} done — true when list_complete.
*/
/**
* @typedef {Object} KVStore
* @typedef {object} KVStore
* @property {(key: string) => Promise<string|null>} get
* @property {(key: string, value: string, opts?: KVStorePutOptions) => Promise<void>} put
* @property {(key: string) => Promise<void>} delete

View File

@@ -0,0 +1,40 @@
/**
* @file SqlStore interface — JSDoc typedefs only, no runtime code.
*
* This is the contract every SQL storage backend must satisfy. Modules
* receive a prefixed `SqlStore` (via {@link module:db/create-sql-store}) and
* must NEVER touch the underlying `env.DB` binding directly.
*
* Table naming convention: `{moduleName}_{table}` (e.g. `trading_trades`).
* Enforced by convention — `tablePrefix` is exposed so authors can interpolate
* it when building dynamic table names, but most authors hard-code the full
* prefixed table name directly in their SQL.
*/
/**
* Raw D1 run result.
*
* @typedef {object} SqlRunResult
* @property {number} changes — rows affected by INSERT/UPDATE/DELETE.
* @property {number} last_row_id — rowid of the last inserted row (0 if none).
*/
/**
* @typedef {object} SqlStore
* @property {string} tablePrefix
* Convenience prefix `"${moduleName}_"`. Authors may interpolate this when
* constructing dynamic table names.
* @property {(query: string, ...binds: any[]) => Promise<SqlRunResult>} run
* Execute a write statement (INSERT/UPDATE/DELETE/CREATE). Returns metadata.
* @property {(query: string, ...binds: any[]) => Promise<any[]>} all
* Execute a SELECT and return all matching rows as plain objects.
* @property {(query: string, ...binds: any[]) => Promise<any|null>} first
* Execute a SELECT and return the first row, or null if no rows match.
* @property {(query: string, ...binds: any[]) => D1PreparedStatement} prepare
* Expose the underlying prepared statement for advanced use (e.g. batch()).
* @property {(statements: D1PreparedStatement[]) => Promise<any[]>} batch
* Execute multiple prepared statements in a single round-trip.
*/
// JSDoc-only module. No runtime exports.
export {};

View File

@@ -13,7 +13,8 @@
*/
import { webhookCallback } from "grammy";
import { getBot } from "./bot.js";
import { getBot, getRegistry } from "./bot.js";
import { dispatchScheduled } from "./modules/cron-dispatcher.js";
/** @type {ReturnType<typeof webhookCallback> | null} */
let cachedWebhookHandler = null;
@@ -31,6 +32,24 @@ async function getWebhookHandler(env) {
}
export default {
/**
* Cloudflare Cron Trigger handler.
* Dispatches the scheduled event to all module cron handlers whose
* schedule matches event.cron.
*
* @param {any} event — ScheduledEvent ({ cron: string, scheduledTime: number })
* @param {any} env
* @param {{ waitUntil: (p: Promise<any>) => void }} ctx
*/
async scheduled(event, env, ctx) {
try {
const registry = await getRegistry(env);
dispatchScheduled(event, env, ctx, registry);
} catch (err) {
console.error("scheduled handler failed", err);
}
},
/**
* @param {Request} request
* @param {any} env

View File

@@ -3,7 +3,7 @@
*
* wrangler bundles statically — dynamic `import(variablePath)` defeats
* tree-shaking and can fail at bundle time. So we enumerate every module here
* as a lazy loader, and {@link loadModules} filters the list at runtime
* as a lazy loader, and loadModules filters the list at runtime
* against `env.MODULES` (comma-separated). Adding a new module is a two-step
* edit: create the folder, then add one line here.
*/

View File

@@ -12,28 +12,40 @@
* - `resetRegistry()` exists for tests.
*/
import { createSqlStore } from "../db/create-sql-store.js";
import { createStore } from "../db/create-store.js";
import { moduleRegistry as defaultModuleRegistry } from "./index.js";
import { validateCommand } from "./validate-command.js";
import { validateCron } from "./validate-cron.js";
/**
* @typedef {import("./validate-command.js").ModuleCommand} ModuleCommand
*
* @typedef {Object} BotModule
* @typedef {import("./validate-cron.js").ModuleCron} ModuleCron
*
* @typedef {object} BotModule
* @property {string} name
* @property {ModuleCommand[]} commands
* @property {({ db, env }: { db: any, env: any }) => Promise<void>|void} [init]
* @property {ModuleCron[]} [crons]
* @property {(ctx: { db: any, sql: any, env: any }) => Promise<void>} [init]
*
* @typedef {Object} RegistryEntry
* @typedef {object} RegistryEntry
* @property {BotModule} module
* @property {ModuleCommand} cmd
* @property {"public"|"protected"|"private"} [visibility]
*
* @typedef {Object} Registry
* @typedef {object} CronEntry
* @property {BotModule} module
* @property {string} schedule
* @property {string} name
* @property {ModuleCron["handler"]} handler
*
* @typedef {object} Registry
* @property {Map<string, RegistryEntry>} publicCommands
* @property {Map<string, RegistryEntry>} protectedCommands
* @property {Map<string, RegistryEntry>} privateCommands
* @property {Map<string, RegistryEntry>} allCommands
* @property {CronEntry[]} crons — flat list of all validated cron entries across modules.
* @property {BotModule[]} modules — ordered per env.MODULES for /help rendering.
*/
@@ -97,6 +109,21 @@ export async function loadModules(env, importMap = defaultModuleRegistry) {
}
for (const cmd of mod.commands) validateCommand(cmd, name);
// Validate crons if present (optional field).
if (mod.crons !== undefined) {
if (!Array.isArray(mod.crons)) {
throw new Error(`module "${name}" crons must be an array`);
}
const cronNames = new Set();
for (const cron of mod.crons) {
validateCron(cron, name);
if (cronNames.has(cron.name)) {
throw new Error(`module "${name}" has duplicate cron name "${cron.name}"`);
}
cronNames.add(cron.name);
}
}
modules.push(mod);
}
@@ -122,11 +149,13 @@ export async function buildRegistry(env, importMap) {
const privateCommands = new Map();
/** @type {Map<string, RegistryEntry>} */
const allCommands = new Map();
/** @type {CronEntry[]} */
const crons = [];
for (const mod of modules) {
if (typeof mod.init === "function") {
try {
await mod.init({ db: createStore(mod.name, env), env });
await mod.init({ db: createStore(mod.name, env), sql: createSqlStore(mod.name, env), env });
} catch (err) {
throw new Error(
`module "${mod.name}" init failed: ${err instanceof Error ? err.message : String(err)}`,
@@ -149,6 +178,18 @@ export async function buildRegistry(env, importMap) {
else if (cmd.visibility === "protected") protectedCommands.set(cmd.name, entry);
else privateCommands.set(cmd.name, entry);
}
// Collect cron entries (validated during loadModules).
if (Array.isArray(mod.crons)) {
for (const cron of mod.crons) {
crons.push({
module: mod,
schedule: cron.schedule,
name: cron.name,
handler: cron.handler,
});
}
}
}
const registry = {
@@ -156,6 +197,7 @@ export async function buildRegistry(env, importMap) {
protectedCommands,
privateCommands,
allCommands,
crons,
modules,
};
currentRegistry = registry;

View File

@@ -17,7 +17,7 @@ export const COMMAND_NAME_RE = /^[a-z0-9_]{1,32}$/;
export const MAX_DESCRIPTION_LENGTH = 256;
/**
* @typedef {Object} ModuleCommand
* @typedef {object} ModuleCommand
* @property {string} name — without leading slash; matches COMMAND_NAME_RE.
* @property {"public"|"protected"|"private"} visibility
* @property {string} description — ≤256 chars; required for all visibilities.