From b728ae7d38d327c9946acb7926ae6e61dcdab6f9 Mon Sep 17 00:00:00 2001 From: tiennm99 Date: Thu, 9 Apr 2026 00:43:07 +0700 Subject: [PATCH] refactor: per-subscriber KV keys, HMAC verification, cron trigger MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major refactor addressing scalability, security, and reliability: - KV schema: single-key → per-subscriber keys (sub:{chatId}:{threadId}) eliminates read-modify-write race conditions - Component-specific subscriptions: /subscribe component - HMAC-SHA256 webhook verification with URL secret fallback - Cron trigger (every 5 min) polls status.claude.com as safety net - Shared telegram-api.js module (DRY fix) - Error logging in all catch blocks - Migration endpoint for existing subscribers - Setup moved to standalone script (scripts/setup-bot.js) - Removed setup HTTP route to reduce attack surface --- CLAUDE.md | 34 +++-- README.md | 36 ++++- src/bot-setup.js => scripts/setup-bot.js | 36 ++--- src/bot-commands.js | 69 +++++++-- src/bot-info-commands.js | 18 ++- src/cron-status-check.js | 77 ++++++++++ src/index.js | 18 ++- src/kv-store.js | 178 ++++++++++++++++------- src/queue-consumer.js | 14 +- src/statuspage-webhook.js | 68 +++++++-- src/telegram-api.js | 8 + wrangler.jsonc | 4 + 12 files changed, 443 insertions(+), 117 deletions(-) rename src/bot-setup.js => scripts/setup-bot.js (63%) create mode 100644 src/cron-status-check.js create mode 100644 src/telegram-api.js diff --git a/CLAUDE.md b/CLAUDE.md index 5e800f7..df0c114 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,7 +10,8 @@ Telegram bot that forwards [status.claude.com](https://status.claude.com/) (Atla - `npm run dev` — Start local dev server (wrangler dev, emulates KV + Queues locally) - `npm run deploy` — Deploy to Cloudflare Workers -- `npx wrangler deploy --dry-run --outdir=dist` — Verify build without deploying +- `npx wrangler deploy --dry-run` — Verify build without deploying +- `node scripts/setup-bot.js` — One-time: register bot commands + set Telegram webhook (requires BOT_TOKEN and WORKER_URL env vars) No test framework configured yet. No linter configured. @@ -18,35 +19,45 @@ No test framework configured yet. No linter configured. - `BOT_TOKEN` — Telegram bot token - `WEBHOOK_SECRET` — Secret token in Statuspage webhook URL path +- `STATUSPAGE_HMAC_KEY` — HMAC key from Statuspage webhook settings (optional, for signature verification) ## Architecture -Cloudflare Workers with two entry points exported from `src/index.js`: +Cloudflare Workers with three entry points exported from `src/index.js`: - **`fetch`** — Hono.js HTTP handler (routes below) - **`queue`** — CF Queues consumer for fan-out message delivery +- **`scheduled`** — CF Cron Trigger (every 5 min) for status polling safety net ### Routes | Method | Path | Handler | Purpose | |--------|------|---------|---------| | GET | `/` | inline | Health check | -| GET | `/webhook/setup/:secret` | `bot-setup.js` | One-time: register bot commands + set Telegram webhook | | POST | `/webhook/telegram` | `bot-commands.js` | grammY `webhookCallback("cloudflare-mod")` | -| POST | `/webhook/status/:secret` | `statuspage-webhook.js` | Receives Statuspage webhooks | +| POST | `/webhook/status/:secret` | `statuspage-webhook.js` | Receives Statuspage webhooks (HMAC + URL secret) | +| GET | `/migrate/:secret` | inline | One-time KV migration (remove after use) | ### Data Flow -1. **Statuspage → Worker**: Webhook POST → validate secret (timing-safe) → parse incident/component event → filter subscribers by preference → `sendBatch` to CF Queue -2. **Queue → Telegram**: Consumer processes batches of 30 → `sendMessage` via raw fetch → auto-removes blocked subscribers (403/400), retries on 429 -3. **User → Bot**: Telegram webhook → grammY handles `/help`, `/start`, `/stop`, `/status`, `/subscribe`, `/history`, `/uptime` commands → reads/writes KV +1. **Statuspage → Worker**: Webhook POST → verify HMAC signature (fallback: URL secret) → parse incident/component event → filter subscribers by type + component → `sendBatch` to CF Queue +2. **Cron → Worker**: Every 5 min → fetch summary → compare with `last-status` KV → notify on changes → update stored state +3. **Queue → Telegram**: Consumer processes batches of 30 → `sendMessage` via `telegram-api.js` helper → auto-removes blocked subscribers (403/400), retries on 429 +4. **User → Bot**: Telegram webhook → grammY handles `/help`, `/start`, `/stop`, `/status`, `/subscribe`, `/history`, `/uptime` commands → reads/writes KV ### KV Storage -Single key `subscribers` stores a JSON object keyed by composite subscriber ID: -- DM/group: `"chatId"` → `{ types: ["incident", "component"] }` -- Supergroup topic: `"chatId:threadId"` → `{ types: ["incident"] }` +Per-subscriber keys (no read-modify-write races): +- `sub:{chatId}` → `{ types: ["incident", "component"], components: [] }` +- `sub:{chatId}:{threadId}` → `{ types: ["incident"], components: ["API"] }` -`kv-store.js` handles key building/parsing — `threadId` can be `0` (General topic), so null checks use `!= null` not truthiness. +Special keys: +- `last-status` — JSON snapshot of component statuses for cron comparison + +`kv-store.js` handles key building/parsing with `kv.list({ prefix: "sub:" })` pagination. `threadId` can be `0` (General topic), so null checks use `!= null`. + +### Component-Specific Subscriptions + +Subscribers can filter to specific components via `/subscribe component `. Empty `components` array = all components (default). Filtering applies to both webhook and cron notifications. ### Supergroup Topic Support @@ -62,3 +73,4 @@ Bot stores `message_thread_id` from the topic where `/start` was sent. Notificat - `claude_status` — KV namespace - `claude-status` — Queue producer/consumer (batch size 30, max retries 3) +- Cron: `*/5 * * * *` — triggers `scheduled` export every 5 minutes diff --git a/README.md b/README.md index 33c2956..b7f09c8 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,11 @@ Hosted on [Cloudflare Workers](https://workers.cloudflare.com/) with KV for stor - **Incident notifications** — new incidents, updates, and resolutions with impact severity - **Component status changes** — e.g., API goes from Operational → Degraded Performance - **Per-user subscription preferences** — subscribe to incidents only, components only, or both +- **Component-specific filtering** — subscribe to specific components (e.g., API only) - **Supergroup topic support** — send `/start` in a specific topic and notifications go to that topic - **On-demand status check** — `/status` fetches live data from status.claude.com +- **Automatic status monitoring** — cron checks every 5 minutes as a safety net +- **HMAC webhook verification** — validates Statuspage webhook signatures - **Self-healing** — automatically removes subscribers who block the bot ## Bot Commands @@ -24,7 +27,9 @@ Hosted on [Cloudflare Workers](https://workers.cloudflare.com/) with KV for stor | `/status ` | Show status of a specific component (fuzzy match) | | `/subscribe incident` | Receive incident notifications only | | `/subscribe component` | Receive component update notifications only | -| `/subscribe all` | Receive both (default) | +| `/subscribe component ` | Filter to a specific component (e.g., `/subscribe component api`) | +| `/subscribe component all` | Clear component filter (receive all) | +| `/subscribe all` | Receive both incidents and components (default) | | `/history` | Show 5 most recent incidents with impact and links | | `/history ` | Show up to 10 recent incidents | | `/uptime` | Component health overview with last status change time | @@ -74,6 +79,10 @@ npx wrangler secret put BOT_TOKEN npx wrangler secret put WEBHOOK_SECRET # Choose a random secret string for the Statuspage webhook URL + +# Optional: HMAC verification for Statuspage webhooks +npx wrangler secret put STATUSPAGE_HMAC_KEY +# Paste the HMAC key from Statuspage webhook settings ``` ### 5. Deploy @@ -86,13 +95,13 @@ Note the worker URL from the output (e.g., `https://claude-status-webhook./webhook/setup/ +```bash +BOT_TOKEN=your-token WORKER_URL=https://your-worker.workers.dev node scripts/setup-bot.js ``` -You should see a JSON response with `{"ok":true}` for both `webhook` and `commands`. +You should see `{"ok":true}` for both webhook and commands. ### 7. Configure Statuspage webhook @@ -102,6 +111,22 @@ You should see a JSON response with `{"ok":true}` for both `webhook` and `comman Replace `` with the secret you set in step 4. +### 8. Run migration (if upgrading) + +If you have existing subscribers from an older version, run the migration endpoint once: + +``` +https:///migrate/ +``` + +This converts the old single-key format to per-subscriber KV keys. Remove the `/migrate` route from `src/index.js` after confirming success. + +## Automatic Status Monitoring + +The bot checks status.claude.com every 5 minutes via Cloudflare Cron Triggers (free tier). If a component status changes between checks, subscribers are notified automatically. This acts as a safety net in case Statuspage webhooks are delayed or missed. + +Cron-detected changes are tagged with "(detected by status check)" to distinguish from webhook notifications. + ## Local Development ```bash @@ -140,6 +165,7 @@ curl -X POST http://localhost:8787/webhook/status/your-test-secret \ - **Runtime**: [Cloudflare Workers](https://workers.cloudflare.com/) - **Storage**: [Cloudflare KV](https://developers.cloudflare.com/kv/) - **Queue**: [Cloudflare Queues](https://developers.cloudflare.com/queues/) +- **Cron**: [Cloudflare Cron Triggers](https://developers.cloudflare.com/workers/configuration/cron-triggers/) - **HTTP framework**: [Hono](https://hono.dev/) - **Telegram framework**: [grammY](https://grammy.dev/) diff --git a/src/bot-setup.js b/scripts/setup-bot.js similarity index 63% rename from src/bot-setup.js rename to scripts/setup-bot.js index 61a61ce..6ee436f 100644 --- a/src/bot-setup.js +++ b/scripts/setup-bot.js @@ -1,3 +1,5 @@ +#!/usr/bin/env node + const TELEGRAM_API = "https://api.telegram.org/bot"; const BOT_COMMANDS = [ @@ -10,37 +12,31 @@ const BOT_COMMANDS = [ { command: "uptime", description: "Component health overview" }, ]; -/** - * One-time setup: register bot commands and set Telegram webhook. - * GET /webhook/setup/:secret - */ -export async function setupBot(c) { - const secret = c.req.param("secret"); - if (secret !== c.env.WEBHOOK_SECRET) { - return c.text("Unauthorized", 401); +async function main() { + const token = process.env.BOT_TOKEN; + const workerUrl = process.env.WORKER_URL; + + if (!token || !workerUrl) { + console.error("Required env vars: BOT_TOKEN, WORKER_URL"); + console.error("Usage: BOT_TOKEN=xxx WORKER_URL=https://your-worker.workers.dev node scripts/setup-bot.js"); + process.exit(1); } - const token = c.env.BOT_TOKEN; - const workerUrl = new URL(c.req.url).origin; - - // Set webhook URL + // Set webhook const webhookRes = await fetch(`${TELEGRAM_API}${token}/setWebhook`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ url: `${workerUrl}/webhook/telegram` }), }); - const webhookData = await webhookRes.json(); + console.log("Webhook:", await webhookRes.json()); - // Register bot commands + // Register commands const commandsRes = await fetch(`${TELEGRAM_API}${token}/setMyCommands`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ commands: BOT_COMMANDS }), }); - const commandsData = await commandsRes.json(); - - return c.json({ - webhook: webhookData, - commands: commandsData, - }); + console.log("Commands:", await commandsRes.json()); } + +main().catch(console.error); diff --git a/src/bot-commands.js b/src/bot-commands.js index 8623ecf..183ff9f 100644 --- a/src/bot-commands.js +++ b/src/bot-commands.js @@ -3,9 +3,10 @@ import { addSubscriber, removeSubscriber, updateSubscriberTypes, - getSubscribers, - buildSubscriberKey, + updateSubscriberComponents, + getSubscriber, } from "./kv-store.js"; +import { fetchComponentByName, escapeHtml } from "./status-fetcher.js"; import { registerInfoCommands } from "./bot-info-commands.js"; /** @@ -46,7 +47,50 @@ export async function handleTelegramWebhook(c) { bot.command("subscribe", async (ctx) => { const { chatId, threadId } = getChatTarget(ctx); - const arg = ctx.match?.trim().toLowerCase(); + const args = ctx.match?.trim().toLowerCase().split(/\s+/) || []; + const arg = args[0]; + + // Handle "/subscribe component " or "/subscribe component all" + if (arg === "component" && args.length > 1) { + const componentArg = args.slice(1).join(" "); + const sub = await getSubscriber(kv, chatId, threadId); + if (!sub) { + await ctx.reply("Not subscribed yet. Use /start first.", { parse_mode: "HTML" }); + return; + } + if (componentArg === "all") { + await updateSubscriberComponents(kv, chatId, threadId, []); + await ctx.reply("Component filter cleared — receiving all component updates.", { + parse_mode: "HTML", + }); + return; + } + // Validate component name against live API + const component = await fetchComponentByName(componentArg); + if (!component) { + await ctx.reply(`Component "${escapeHtml(componentArg)}" not found.`, { + parse_mode: "HTML", + }); + return; + } + // Add to component filter (deduplicate) + const components = sub.components || []; + if (!components.some((c) => c.toLowerCase() === component.name.toLowerCase())) { + components.push(component.name); + } + await updateSubscriberComponents(kv, chatId, threadId, components); + // Ensure "component" is in types + if (!sub.types.includes("component")) { + sub.types.push("component"); + await updateSubscriberTypes(kv, chatId, threadId, sub.types); + } + await ctx.reply( + `Subscribed to component: ${escapeHtml(component.name)}\n` + + `Active filters: ${components.join(", ")}`, + { parse_mode: "HTML" } + ); + return; + } const validTypes = { incident: ["incident"], @@ -55,19 +99,26 @@ export async function handleTelegramWebhook(c) { }; if (!arg || !validTypes[arg]) { - const key = buildSubscriberKey(chatId, threadId); - const subs = await getSubscribers(kv); - const current = subs[key]?.types?.join(", ") || "none (use /start first)"; + const sub = await getSubscriber(kv, chatId, threadId); + const current = sub?.types?.join(", ") || "none (use /start first)"; + const compFilter = sub?.components?.length ? sub.components.join(", ") : "all"; await ctx.reply( - "Usage: /subscribe <type>\n\n" + + "Usage: /subscribe <type> [component]\n\n" + "Types: incident, component, all\n" + - `\nCurrent: ${current}`, + "Component filter: /subscribe component api\n" + + "Clear filter: /subscribe component all\n" + + `\nCurrent types: ${current}\n` + + `Components: ${compFilter}`, { parse_mode: "HTML" } ); return; } - await updateSubscriberTypes(kv, chatId, threadId, validTypes[arg]); + const updated = await updateSubscriberTypes(kv, chatId, threadId, validTypes[arg]); + if (!updated) { + await ctx.reply("Not subscribed yet. Use /start first.", { parse_mode: "HTML" }); + return; + } await ctx.reply(`Subscription updated: ${validTypes[arg].join(", ")}`, { parse_mode: "HTML", }); diff --git a/src/bot-info-commands.js b/src/bot-info-commands.js index bcf88de..4be67ba 100644 --- a/src/bot-info-commands.js +++ b/src/bot-info-commands.js @@ -1,4 +1,5 @@ import { + escapeHtml, fetchComponentByName, fetchSummary, fetchIncidents, @@ -26,9 +27,11 @@ export function registerInfoCommands(bot) { `Show current status of all components.\n` + `Add a component name for a specific check.\n` + `Example: /status api\n\n` + - `/subscribe <type>\n` + + `/subscribe <type> [component]\n` + `Set what notifications you receive.\n` + - `Options: incident, component, all\n` + + `Types: incident, component, all\n` + + `Component filter: /subscribe component api\n` + + `Clear filter: /subscribe component all\n` + `Example: /subscribe incident\n\n` + `/history [count]\n` + `Show recent incidents. Default: 5, max: 10.\n` + @@ -46,7 +49,7 @@ export function registerInfoCommands(bot) { if (args) { const component = await fetchComponentByName(args); if (!component) { - await ctx.reply(`Component "${args}" not found.`, { parse_mode: "HTML" }); + await ctx.reply(`Component "${escapeHtml(args)}" not found.`, { parse_mode: "HTML" }); return; } await ctx.reply(formatComponentLine(component), { parse_mode: "HTML" }); @@ -66,7 +69,8 @@ export function registerInfoCommands(bot) { { parse_mode: "HTML", disable_web_page_preview: true } ); } - } catch { + } catch (err) { + console.error("status command error:", err); await ctx.reply("Unable to fetch status. Please try again later."); } }); @@ -87,7 +91,8 @@ export function registerInfoCommands(bot) { `View full history`, { parse_mode: "HTML", disable_web_page_preview: true } ); - } catch { + } catch (err) { + console.error("history command error:", err); await ctx.reply("Unable to fetch incident history. Please try again later."); } }); @@ -111,7 +116,8 @@ export function registerInfoCommands(bot) { `View uptime on status page`, { parse_mode: "HTML", disable_web_page_preview: true } ); - } catch { + } catch (err) { + console.error("uptime command error:", err); await ctx.reply("Unable to fetch uptime data. Please try again later."); } }); diff --git a/src/cron-status-check.js b/src/cron-status-check.js new file mode 100644 index 0000000..21376bb --- /dev/null +++ b/src/cron-status-check.js @@ -0,0 +1,77 @@ +import { fetchSummary, humanizeStatus, escapeHtml } from "./status-fetcher.js"; +import { getSubscribersByType } from "./kv-store.js"; + +const LAST_STATUS_KEY = "last-status"; + +/** + * Build a map of component name -> status from summary + */ +function buildStatusMap(summary) { + const map = {}; + for (const c of summary.components) { + if (!c.group) map[c.name] = c.status; + } + return map; +} + +/** + * Format a component status change detected by cron + */ +function formatChangeMessage(name, oldStatus, newStatus) { + return ( + `Component Update: ${escapeHtml(name)}\n` + + `${humanizeStatus(oldStatus)} → ${humanizeStatus(newStatus)}\n` + + `(detected by status check)` + ); +} + +/** + * CF Scheduled handler — polls status page, notifies on changes + */ +export async function handleScheduled(env) { + const kv = env.claude_status; + const queue = env["claude-status"]; + + let summary; + try { + summary = await fetchSummary(); + } catch (err) { + console.error("Cron: failed to fetch status:", err); + return; + } + + const currentMap = buildStatusMap(summary); + const stored = await kv.get(LAST_STATUS_KEY, "json"); + const previousMap = stored?.components || {}; + + // Find changes (only if previous state exists for that component) + const changes = []; + for (const [name, status] of Object.entries(currentMap)) { + if (previousMap[name] && previousMap[name] !== status) { + changes.push({ name, oldStatus: previousMap[name], newStatus: status }); + } + } + + // Always update stored state (proves cron is running) + await kv.put(LAST_STATUS_KEY, JSON.stringify({ + components: currentMap, + timestamp: new Date().toISOString(), + })); + + if (changes.length === 0) return; + + console.log(`Cron: ${changes.length} component change(s) detected`); + + // Enqueue notifications for each change + for (const { name, oldStatus, newStatus } of changes) { + const html = formatChangeMessage(name, oldStatus, newStatus); + const subscribers = await getSubscribersByType(kv, "component", name); + const messages = subscribers.map(({ chatId, threadId }) => ({ + body: { chatId, threadId, html }, + })); + for (let i = 0; i < messages.length; i += 100) { + await queue.sendBatch(messages.slice(i, i + 100)); + } + console.log(`Cron: enqueued ${messages.length} messages for ${name} change`); + } +} diff --git a/src/index.js b/src/index.js index aecb7ce..288203b 100644 --- a/src/index.js +++ b/src/index.js @@ -2,16 +2,30 @@ import { Hono } from "hono"; import { handleTelegramWebhook } from "./bot-commands.js"; import { handleStatuspageWebhook } from "./statuspage-webhook.js"; import { handleQueue } from "./queue-consumer.js"; -import { setupBot } from "./bot-setup.js"; +import { handleScheduled } from "./cron-status-check.js"; +import { migrateFromSingleKey } from "./kv-store.js"; const app = new Hono(); app.get("/", (c) => c.text("Claude Status Bot is running")); -app.get("/webhook/setup/:secret", (c) => setupBot(c)); app.post("/webhook/telegram", (c) => handleTelegramWebhook(c)); app.post("/webhook/status/:secret", (c) => handleStatuspageWebhook(c)); +// One-time migration route — remove after migration is confirmed +app.get("/migrate/:secret", async (c) => { + const secret = c.req.param("secret"); + const encoder = new TextEncoder(); + const a = encoder.encode(secret); + const b = encoder.encode(c.env.WEBHOOK_SECRET); + if (a.byteLength !== b.byteLength || !crypto.subtle.timingSafeEqual(a, b)) { + return c.text("Unauthorized", 401); + } + const count = await migrateFromSingleKey(c.env.claude_status); + return c.json({ migrated: count }); +}); + export default { fetch: app.fetch, queue: handleQueue, + scheduled: (event, env, ctx) => ctx.waitUntil(handleScheduled(env)), }; diff --git a/src/kv-store.js b/src/kv-store.js index 1a8beee..3c83f7a 100644 --- a/src/kv-store.js +++ b/src/kv-store.js @@ -1,84 +1,164 @@ -const KV_KEY = "subscribers"; +const KEY_PREFIX = "sub:"; /** - * Build composite key: "chatId" or "chatId:threadId" for supergroup topics + * Build KV key: "sub:{chatId}" or "sub:{chatId}:{threadId}" */ -export function buildSubscriberKey(chatId, threadId) { - return threadId != null ? `${chatId}:${threadId}` : `${chatId}`; +function buildKvKey(chatId, threadId) { + const suffix = threadId != null ? `${chatId}:${threadId}` : `${chatId}`; + return `${KEY_PREFIX}${suffix}`; } /** - * Parse composite key back into { chatId, threadId } + * Parse KV key back into { chatId, threadId } + * Key format: "sub:{chatId}" or "sub:{chatId}:{threadId}" */ -export function parseSubscriberKey(key) { - const parts = key.split(":"); - if (parts.length >= 2 && parts[0].startsWith("-")) { - // Supergroup IDs start with "-100", so key is "-100xxx:threadId" - // Find the last ":" — everything before is chatId, after is threadId - const lastColon = key.lastIndexOf(":"); +function parseKvKey(kvKey) { + const raw = kvKey.slice(KEY_PREFIX.length); + const lastColon = raw.lastIndexOf(":"); + // No colon or only negative sign prefix — no threadId + if (lastColon <= 0) { + return { chatId: raw, threadId: null }; + } + // Check if the part after last colon is a valid threadId (numeric) + const possibleThread = raw.slice(lastColon + 1); + if (/^\d+$/.test(possibleThread)) { return { - chatId: key.slice(0, lastColon), - threadId: parseInt(key.slice(lastColon + 1), 10), + chatId: raw.slice(0, lastColon), + threadId: parseInt(possibleThread, 10), }; } - return { chatId: key, threadId: null }; + return { chatId: raw, threadId: null }; } /** - * Get all subscribers from KV + * List all subscriber KV keys with cursor pagination */ -export async function getSubscribers(kv) { - const data = await kv.get(KV_KEY, "json"); - return data || {}; +async function listAllSubscriberKeys(kv) { + const keys = []; + let cursor = undefined; + do { + const result = await kv.list({ prefix: KEY_PREFIX, cursor }); + keys.push(...result.keys); + cursor = result.list_complete ? undefined : result.cursor; + } while (cursor); + return keys; } /** - * Write all subscribers to KV - */ -async function setSubscribers(kv, data) { - await kv.put(KV_KEY, JSON.stringify(data)); -} - -/** - * Add subscriber with default types + * Add or re-subscribe a user with default types */ export async function addSubscriber(kv, chatId, threadId, types = ["incident", "component"]) { - const subs = await getSubscribers(kv); - const key = buildSubscriberKey(chatId, threadId); - subs[key] = { types }; - await setSubscribers(kv, subs); + const key = buildKvKey(chatId, threadId); + const existing = await kv.get(key, "json"); + const value = { types, components: existing?.components || [] }; + await kv.put(key, JSON.stringify(value)); } /** - * Remove subscriber + * Remove subscriber — atomic single delete */ export async function removeSubscriber(kv, chatId, threadId) { - const subs = await getSubscribers(kv); - const key = buildSubscriberKey(chatId, threadId); - delete subs[key]; - await setSubscribers(kv, subs); + const key = buildKvKey(chatId, threadId); + await kv.delete(key); } /** * Update subscriber's notification type preferences */ export async function updateSubscriberTypes(kv, chatId, threadId, types) { - const subs = await getSubscribers(kv); - const key = buildSubscriberKey(chatId, threadId); - if (!subs[key]) { - subs[key] = { types }; - } else { - subs[key].types = types; - } - await setSubscribers(kv, subs); + const key = buildKvKey(chatId, threadId); + const existing = await kv.get(key, "json"); + if (!existing) return false; + existing.types = types; + await kv.put(key, JSON.stringify(existing)); + return true; } /** - * Get subscribers filtered by event type, returns [{ chatId, threadId }, ...] + * Update subscriber's component filter list */ -export async function getSubscribersByType(kv, eventType) { - const subs = await getSubscribers(kv); - return Object.entries(subs) - .filter(([, val]) => val.types.includes(eventType)) - .map(([key]) => parseSubscriberKey(key)); +export async function updateSubscriberComponents(kv, chatId, threadId, components) { + const key = buildKvKey(chatId, threadId); + const existing = await kv.get(key, "json"); + if (!existing) return false; + existing.components = components; + await kv.put(key, JSON.stringify(existing)); + return true; +} + +/** + * Get a single subscriber's data, or null if not subscribed + */ +export async function getSubscriber(kv, chatId, threadId) { + const key = buildKvKey(chatId, threadId); + return kv.get(key, "json"); +} + +/** + * Get subscribers filtered by event type and optional component name. + * Returns [{ chatId, threadId, ...value }, ...] + */ +export async function getSubscribersByType(kv, eventType, componentName = null) { + const keys = await listAllSubscriberKeys(kv); + const results = []; + + for (const { name } of keys) { + const value = await kv.get(name, "json"); + if (!value || !value.types.includes(eventType)) continue; + + // Component-specific filtering + if (eventType === "component" && componentName && value.components?.length > 0) { + const match = value.components.some( + (c) => c.toLowerCase() === componentName.toLowerCase() + ); + if (!match) continue; + } + + const { chatId, threadId } = parseKvKey(name); + results.push({ chatId, threadId }); + } + + return results; +} + +/** + * Get all subscribers with their full data + */ +export async function getAllSubscribers(kv) { + const keys = await listAllSubscriberKeys(kv); + const results = []; + for (const { name } of keys) { + const value = await kv.get(name, "json"); + if (!value) continue; + const { chatId, threadId } = parseKvKey(name); + results.push({ chatId, threadId, ...value }); + } + return results; +} + +/** + * One-time migration from single-key "subscribers" to per-key format. + * Returns count of migrated entries. + */ +export async function migrateFromSingleKey(kv) { + const old = await kv.get("subscribers", "json"); + if (!old) return 0; + + const entries = Object.entries(old); + for (const [compositeKey, value] of entries) { + // Preserve components field if it exists, default to empty + const data = { types: value.types || [], components: value.components || [] }; + await kv.put(`${KEY_PREFIX}${compositeKey}`, JSON.stringify(data)); + } + + // Verify migrated count before deleting old key + const migrated = await listAllSubscriberKeys(kv); + if (migrated.length >= entries.length) { + await kv.delete("subscribers"); + console.log(`Migration complete: ${entries.length} subscribers migrated`); + } else { + console.error(`Migration verification failed: expected ${entries.length}, got ${migrated.length}`); + } + + return entries.length; } diff --git a/src/queue-consumer.js b/src/queue-consumer.js index f967eb8..b6eca0c 100644 --- a/src/queue-consumer.js +++ b/src/queue-consumer.js @@ -1,6 +1,5 @@ import { removeSubscriber } from "./kv-store.js"; - -const TELEGRAM_API = "https://api.telegram.org/bot"; +import { telegramUrl } from "./telegram-api.js"; /** * Process a batch of queued messages, sending each to Telegram. @@ -12,6 +11,7 @@ export async function handleQueue(batch, env) { // Defensive check for malformed messages if (!chatId || !html) { + console.error("Queue: malformed message, skipping", msg.body); msg.ack(); continue; } @@ -24,9 +24,9 @@ export async function handleQueue(batch, env) { disable_web_page_preview: true, }; // Send to specific supergroup topic if threadId present - if (threadId) payload.message_thread_id = threadId; + if (threadId != null) payload.message_thread_id = threadId; - const res = await fetch(`${TELEGRAM_API}${env.BOT_TOKEN}/sendMessage`, { + const res = await fetch(telegramUrl(env.BOT_TOKEN, "sendMessage"), { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(payload), @@ -36,17 +36,21 @@ export async function handleQueue(batch, env) { msg.ack(); } else if (res.status === 403 || res.status === 400) { // Bot blocked or chat not found — auto-remove subscriber + console.log(`Queue: removing subscriber ${chatId}:${threadId} (HTTP ${res.status})`); await removeSubscriber(env.claude_status, chatId, threadId); msg.ack(); } else if (res.status === 429) { // Rate limited — let queue retry later + console.log("Queue: rate limited, retrying"); msg.retry(); } else { // Unknown error — ack to avoid infinite retry + console.error(`Queue: unexpected HTTP ${res.status} for ${chatId}`); msg.ack(); } - } catch { + } catch (err) { // Network error — retry + console.error("Queue: network error, retrying", err); msg.retry(); } } diff --git a/src/statuspage-webhook.js b/src/statuspage-webhook.js index 8c6b828..2999837 100644 --- a/src/statuspage-webhook.js +++ b/src/statuspage-webhook.js @@ -1,6 +1,49 @@ import { getSubscribersByType } from "./kv-store.js"; import { humanizeStatus, escapeHtml } from "./status-fetcher.js"; +/** + * Convert hex string to Uint8Array + */ +function hexToBytes(hex) { + const bytes = new Uint8Array(hex.length / 2); + for (let i = 0; i < hex.length; i += 2) { + bytes[i / 2] = parseInt(hex.substr(i, 2), 16); + } + return bytes; +} + +/** + * Verify Statuspage HMAC-SHA256 signature + */ +async function verifyHmacSignature(request, hmacKey) { + if (!hmacKey) return false; + const signature = request.headers.get("X-Statuspage-Signature"); + if (!signature) return false; + + const body = await request.clone().arrayBuffer(); + const key = await crypto.subtle.importKey( + "raw", + new TextEncoder().encode(hmacKey), + { name: "HMAC", hash: "SHA-256" }, + false, + ["verify"] + ); + + const sigBytes = hexToBytes(signature); + return crypto.subtle.verify("HMAC", key, sigBytes, body); +} + +/** + * Timing-safe string comparison + */ +async function timingSafeEqual(a, b) { + const encoder = new TextEncoder(); + const bufA = encoder.encode(a); + const bufB = encoder.encode(b); + if (bufA.byteLength !== bufB.byteLength) return false; + return crypto.subtle.timingSafeEqual(bufA, bufB); +} + /** * Format incident event as Telegram HTML message */ @@ -36,13 +79,13 @@ function formatComponentMessage(component, update) { * Handle incoming Statuspage webhook */ export async function handleStatuspageWebhook(c) { - // Validate secret (timing-safe comparison) - const secret = c.req.param("secret"); - const encoder = new TextEncoder(); - const a = encoder.encode(secret); - const b = encoder.encode(c.env.WEBHOOK_SECRET); - if (a.byteLength !== b.byteLength || !crypto.subtle.timingSafeEqual(a, b)) { - return c.text("Unauthorized", 401); + // Try HMAC verification first, fall back to URL secret + const hmacValid = await verifyHmacSignature(c.req.raw, c.env.STATUSPAGE_HMAC_KEY); + if (!hmacValid) { + const secret = c.req.param("secret"); + if (!await timingSafeEqual(secret, c.env.WEBHOOK_SECRET)) { + return c.text("Unauthorized", 401); + } } // Parse body @@ -56,20 +99,23 @@ export async function handleStatuspageWebhook(c) { const eventType = body?.meta?.event_type; if (!eventType) return c.text("Bad Request", 400); + console.log(`Statuspage webhook: ${eventType}`); + // Determine category and format message - let category, html; + let category, html, componentName; if (eventType.startsWith("incident.")) { category = "incident"; html = formatIncidentMessage(body.incident); } else if (eventType.startsWith("component.")) { category = "component"; + componentName = body.component?.name || null; html = formatComponentMessage(body.component, body.component_update); } else { return c.text("Unknown event type", 400); } - // Get filtered subscribers - const subscribers = await getSubscribersByType(c.env.claude_status, category); + // Get filtered subscribers (with component name filtering) + const subscribers = await getSubscribersByType(c.env.claude_status, category, componentName); // Enqueue messages for fan-out via CF Queues (batch for performance) const messages = subscribers.map(({ chatId, threadId }) => ({ @@ -79,5 +125,7 @@ export async function handleStatuspageWebhook(c) { await c.env["claude-status"].sendBatch(messages.slice(i, i + 100)); } + console.log(`Enqueued ${messages.length} messages for ${category}${componentName ? `:${componentName}` : ""}`); + return c.text("OK", 200); } diff --git a/src/telegram-api.js b/src/telegram-api.js new file mode 100644 index 0000000..926d326 --- /dev/null +++ b/src/telegram-api.js @@ -0,0 +1,8 @@ +/** + * Shared Telegram Bot API base URL and helper + */ +export const TELEGRAM_API = "https://api.telegram.org/bot"; + +export function telegramUrl(token, method) { + return `${TELEGRAM_API}${token}/${method}`; +} diff --git a/wrangler.jsonc b/wrangler.jsonc index 41b4e9b..cfe66ad 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -23,8 +23,12 @@ "max_retries": 3 } ] + }, + "triggers": { + "crons": ["*/5 * * * *"] } // Secrets (set via `wrangler secret put`): // BOT_TOKEN - Telegram bot token // WEBHOOK_SECRET - Statuspage webhook URL secret + // STATUSPAGE_HMAC_KEY - HMAC key from Statuspage webhook settings (optional) }