refactor: per-subscriber KV keys, HMAC verification, cron trigger

Major refactor addressing scalability, security, and reliability:

- KV schema: single-key → per-subscriber keys (sub:{chatId}:{threadId})
  eliminates read-modify-write race conditions
- Component-specific subscriptions: /subscribe component <name>
- HMAC-SHA256 webhook verification with URL secret fallback
- Cron trigger (every 5 min) polls status.claude.com as safety net
- Shared telegram-api.js module (DRY fix)
- Error logging in all catch blocks
- Migration endpoint for existing subscribers
- Setup moved to standalone script (scripts/setup-bot.js)
- Removed setup HTTP route to reduce attack surface
This commit is contained in:
2026-04-09 00:43:07 +07:00
parent 30ffaae612
commit b728ae7d38
12 changed files with 443 additions and 117 deletions

View File

@@ -10,7 +10,8 @@ Telegram bot that forwards [status.claude.com](https://status.claude.com/) (Atla
- `npm run dev` — Start local dev server (wrangler dev, emulates KV + Queues locally)
- `npm run deploy` — Deploy to Cloudflare Workers
- `npx wrangler deploy --dry-run --outdir=dist` — Verify build without deploying
- `npx wrangler deploy --dry-run` — Verify build without deploying
- `node scripts/setup-bot.js` — One-time: register bot commands + set Telegram webhook (requires BOT_TOKEN and WORKER_URL env vars)
No test framework configured yet. No linter configured.
@@ -18,35 +19,45 @@ No test framework configured yet. No linter configured.
- `BOT_TOKEN` — Telegram bot token
- `WEBHOOK_SECRET` — Secret token in Statuspage webhook URL path
- `STATUSPAGE_HMAC_KEY` — HMAC key from Statuspage webhook settings (optional, for signature verification)
## Architecture
Cloudflare Workers with two entry points exported from `src/index.js`:
Cloudflare Workers with three entry points exported from `src/index.js`:
- **`fetch`** — Hono.js HTTP handler (routes below)
- **`queue`** — CF Queues consumer for fan-out message delivery
- **`scheduled`** — CF Cron Trigger (every 5 min) for status polling safety net
### Routes
| Method | Path | Handler | Purpose |
|--------|------|---------|---------|
| GET | `/` | inline | Health check |
| GET | `/webhook/setup/:secret` | `bot-setup.js` | One-time: register bot commands + set Telegram webhook |
| POST | `/webhook/telegram` | `bot-commands.js` | grammY `webhookCallback("cloudflare-mod")` |
| POST | `/webhook/status/:secret` | `statuspage-webhook.js` | Receives Statuspage webhooks |
| POST | `/webhook/status/:secret` | `statuspage-webhook.js` | Receives Statuspage webhooks (HMAC + URL secret) |
| GET | `/migrate/:secret` | inline | One-time KV migration (remove after use) |
### Data Flow
1. **Statuspage → Worker**: Webhook POST → validate secret (timing-safe) → parse incident/component event → filter subscribers by preference`sendBatch` to CF Queue
2. **Queue → Telegram**: Consumer processes batches of 30 → `sendMessage` via raw fetch → auto-removes blocked subscribers (403/400), retries on 429
3. **User → Bot**: Telegram webhook → grammY handles `/help`, `/start`, `/stop`, `/status`, `/subscribe`, `/history`, `/uptime` commands → reads/writes KV
1. **Statuspage → Worker**: Webhook POST → verify HMAC signature (fallback: URL secret) → parse incident/component event → filter subscribers by type + component`sendBatch` to CF Queue
2. **Cron → Worker**: Every 5 min → fetch summary → compare with `last-status` KV → notify on changes → update stored state
3. **Queue → Telegram**: Consumer processes batches of 30 → `sendMessage` via `telegram-api.js` helper → auto-removes blocked subscribers (403/400), retries on 429
4. **User → Bot**: Telegram webhook → grammY handles `/help`, `/start`, `/stop`, `/status`, `/subscribe`, `/history`, `/uptime` commands → reads/writes KV
### KV Storage
Single key `subscribers` stores a JSON object keyed by composite subscriber ID:
- DM/group: `"chatId"``{ types: ["incident", "component"] }`
- Supergroup topic: `"chatId:threadId"``{ types: ["incident"] }`
Per-subscriber keys (no read-modify-write races):
- `sub:{chatId}``{ types: ["incident", "component"], components: [] }`
- `sub:{chatId}:{threadId}``{ types: ["incident"], components: ["API"] }`
`kv-store.js` handles key building/parsing — `threadId` can be `0` (General topic), so null checks use `!= null` not truthiness.
Special keys:
- `last-status` — JSON snapshot of component statuses for cron comparison
`kv-store.js` handles key building/parsing with `kv.list({ prefix: "sub:" })` pagination. `threadId` can be `0` (General topic), so null checks use `!= null`.
### Component-Specific Subscriptions
Subscribers can filter to specific components via `/subscribe component <name>`. Empty `components` array = all components (default). Filtering applies to both webhook and cron notifications.
### Supergroup Topic Support
@@ -62,3 +73,4 @@ Bot stores `message_thread_id` from the topic where `/start` was sent. Notificat
- `claude_status` — KV namespace
- `claude-status` — Queue producer/consumer (batch size 30, max retries 3)
- Cron: `*/5 * * * *` — triggers `scheduled` export every 5 minutes

View File

@@ -9,8 +9,11 @@ Hosted on [Cloudflare Workers](https://workers.cloudflare.com/) with KV for stor
- **Incident notifications** — new incidents, updates, and resolutions with impact severity
- **Component status changes** — e.g., API goes from Operational → Degraded Performance
- **Per-user subscription preferences** — subscribe to incidents only, components only, or both
- **Component-specific filtering** — subscribe to specific components (e.g., API only)
- **Supergroup topic support** — send `/start` in a specific topic and notifications go to that topic
- **On-demand status check** — `/status` fetches live data from status.claude.com
- **Automatic status monitoring** — cron checks every 5 minutes as a safety net
- **HMAC webhook verification** — validates Statuspage webhook signatures
- **Self-healing** — automatically removes subscribers who block the bot
## Bot Commands
@@ -24,7 +27,9 @@ Hosted on [Cloudflare Workers](https://workers.cloudflare.com/) with KV for stor
| `/status <name>` | Show status of a specific component (fuzzy match) |
| `/subscribe incident` | Receive incident notifications only |
| `/subscribe component` | Receive component update notifications only |
| `/subscribe all` | Receive both (default) |
| `/subscribe component <name>` | Filter to a specific component (e.g., `/subscribe component api`) |
| `/subscribe component all` | Clear component filter (receive all) |
| `/subscribe all` | Receive both incidents and components (default) |
| `/history` | Show 5 most recent incidents with impact and links |
| `/history <count>` | Show up to 10 recent incidents |
| `/uptime` | Component health overview with last status change time |
@@ -74,6 +79,10 @@ npx wrangler secret put BOT_TOKEN
npx wrangler secret put WEBHOOK_SECRET
# Choose a random secret string for the Statuspage webhook URL
# Optional: HMAC verification for Statuspage webhooks
npx wrangler secret put STATUSPAGE_HMAC_KEY
# Paste the HMAC key from Statuspage webhook settings
```
### 5. Deploy
@@ -86,13 +95,13 @@ Note the worker URL from the output (e.g., `https://claude-status-webhook.<your-
### 6. Set up Telegram bot
Visit the setup URL in your browser to register bot commands and set the Telegram webhook in one step:
Run the setup script to register bot commands and set the Telegram webhook:
```
https://<WORKER_URL>/webhook/setup/<WEBHOOK_SECRET>
```bash
BOT_TOKEN=your-token WORKER_URL=https://your-worker.workers.dev node scripts/setup-bot.js
```
You should see a JSON response with `{"ok":true}` for both `webhook` and `commands`.
You should see `{"ok":true}` for both webhook and commands.
### 7. Configure Statuspage webhook
@@ -102,6 +111,22 @@ You should see a JSON response with `{"ok":true}` for both `webhook` and `comman
Replace `<WEBHOOK_SECRET>` with the secret you set in step 4.
### 8. Run migration (if upgrading)
If you have existing subscribers from an older version, run the migration endpoint once:
```
https://<WORKER_URL>/migrate/<WEBHOOK_SECRET>
```
This converts the old single-key format to per-subscriber KV keys. Remove the `/migrate` route from `src/index.js` after confirming success.
## Automatic Status Monitoring
The bot checks status.claude.com every 5 minutes via Cloudflare Cron Triggers (free tier). If a component status changes between checks, subscribers are notified automatically. This acts as a safety net in case Statuspage webhooks are delayed or missed.
Cron-detected changes are tagged with "(detected by status check)" to distinguish from webhook notifications.
## Local Development
```bash
@@ -140,6 +165,7 @@ curl -X POST http://localhost:8787/webhook/status/your-test-secret \
- **Runtime**: [Cloudflare Workers](https://workers.cloudflare.com/)
- **Storage**: [Cloudflare KV](https://developers.cloudflare.com/kv/)
- **Queue**: [Cloudflare Queues](https://developers.cloudflare.com/queues/)
- **Cron**: [Cloudflare Cron Triggers](https://developers.cloudflare.com/workers/configuration/cron-triggers/)
- **HTTP framework**: [Hono](https://hono.dev/)
- **Telegram framework**: [grammY](https://grammy.dev/)

View File

@@ -1,3 +1,5 @@
#!/usr/bin/env node
const TELEGRAM_API = "https://api.telegram.org/bot";
const BOT_COMMANDS = [
@@ -10,37 +12,31 @@ const BOT_COMMANDS = [
{ command: "uptime", description: "Component health overview" },
];
/**
* One-time setup: register bot commands and set Telegram webhook.
* GET /webhook/setup/:secret
*/
export async function setupBot(c) {
const secret = c.req.param("secret");
if (secret !== c.env.WEBHOOK_SECRET) {
return c.text("Unauthorized", 401);
async function main() {
const token = process.env.BOT_TOKEN;
const workerUrl = process.env.WORKER_URL;
if (!token || !workerUrl) {
console.error("Required env vars: BOT_TOKEN, WORKER_URL");
console.error("Usage: BOT_TOKEN=xxx WORKER_URL=https://your-worker.workers.dev node scripts/setup-bot.js");
process.exit(1);
}
const token = c.env.BOT_TOKEN;
const workerUrl = new URL(c.req.url).origin;
// Set webhook URL
// Set webhook
const webhookRes = await fetch(`${TELEGRAM_API}${token}/setWebhook`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ url: `${workerUrl}/webhook/telegram` }),
});
const webhookData = await webhookRes.json();
console.log("Webhook:", await webhookRes.json());
// Register bot commands
// Register commands
const commandsRes = await fetch(`${TELEGRAM_API}${token}/setMyCommands`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ commands: BOT_COMMANDS }),
});
const commandsData = await commandsRes.json();
return c.json({
webhook: webhookData,
commands: commandsData,
});
console.log("Commands:", await commandsRes.json());
}
main().catch(console.error);

View File

@@ -3,9 +3,10 @@ import {
addSubscriber,
removeSubscriber,
updateSubscriberTypes,
getSubscribers,
buildSubscriberKey,
updateSubscriberComponents,
getSubscriber,
} from "./kv-store.js";
import { fetchComponentByName, escapeHtml } from "./status-fetcher.js";
import { registerInfoCommands } from "./bot-info-commands.js";
/**
@@ -46,7 +47,50 @@ export async function handleTelegramWebhook(c) {
bot.command("subscribe", async (ctx) => {
const { chatId, threadId } = getChatTarget(ctx);
const arg = ctx.match?.trim().toLowerCase();
const args = ctx.match?.trim().toLowerCase().split(/\s+/) || [];
const arg = args[0];
// Handle "/subscribe component <name>" or "/subscribe component all"
if (arg === "component" && args.length > 1) {
const componentArg = args.slice(1).join(" ");
const sub = await getSubscriber(kv, chatId, threadId);
if (!sub) {
await ctx.reply("Not subscribed yet. Use /start first.", { parse_mode: "HTML" });
return;
}
if (componentArg === "all") {
await updateSubscriberComponents(kv, chatId, threadId, []);
await ctx.reply("Component filter cleared — receiving all component updates.", {
parse_mode: "HTML",
});
return;
}
// Validate component name against live API
const component = await fetchComponentByName(componentArg);
if (!component) {
await ctx.reply(`Component "<code>${escapeHtml(componentArg)}</code>" not found.`, {
parse_mode: "HTML",
});
return;
}
// Add to component filter (deduplicate)
const components = sub.components || [];
if (!components.some((c) => c.toLowerCase() === component.name.toLowerCase())) {
components.push(component.name);
}
await updateSubscriberComponents(kv, chatId, threadId, components);
// Ensure "component" is in types
if (!sub.types.includes("component")) {
sub.types.push("component");
await updateSubscriberTypes(kv, chatId, threadId, sub.types);
}
await ctx.reply(
`Subscribed to component: <code>${escapeHtml(component.name)}</code>\n` +
`Active filters: <code>${components.join(", ")}</code>`,
{ parse_mode: "HTML" }
);
return;
}
const validTypes = {
incident: ["incident"],
@@ -55,19 +99,26 @@ export async function handleTelegramWebhook(c) {
};
if (!arg || !validTypes[arg]) {
const key = buildSubscriberKey(chatId, threadId);
const subs = await getSubscribers(kv);
const current = subs[key]?.types?.join(", ") || "none (use /start first)";
const sub = await getSubscriber(kv, chatId, threadId);
const current = sub?.types?.join(", ") || "none (use /start first)";
const compFilter = sub?.components?.length ? sub.components.join(", ") : "all";
await ctx.reply(
"<b>Usage:</b> /subscribe &lt;type&gt;\n\n" +
"<b>Usage:</b> /subscribe &lt;type&gt; [component]\n\n" +
"Types: <code>incident</code>, <code>component</code>, <code>all</code>\n" +
`\nCurrent: <code>${current}</code>`,
"Component filter: <code>/subscribe component api</code>\n" +
"Clear filter: <code>/subscribe component all</code>\n" +
`\nCurrent types: <code>${current}</code>\n` +
`Components: <code>${compFilter}</code>`,
{ parse_mode: "HTML" }
);
return;
}
await updateSubscriberTypes(kv, chatId, threadId, validTypes[arg]);
const updated = await updateSubscriberTypes(kv, chatId, threadId, validTypes[arg]);
if (!updated) {
await ctx.reply("Not subscribed yet. Use /start first.", { parse_mode: "HTML" });
return;
}
await ctx.reply(`Subscription updated: <code>${validTypes[arg].join(", ")}</code>`, {
parse_mode: "HTML",
});

View File

@@ -1,4 +1,5 @@
import {
escapeHtml,
fetchComponentByName,
fetchSummary,
fetchIncidents,
@@ -26,9 +27,11 @@ export function registerInfoCommands(bot) {
`Show current status of all components.\n` +
`Add a component name for a specific check.\n` +
`Example: <code>/status api</code>\n\n` +
`<b>/subscribe</b> &lt;type&gt;\n` +
`<b>/subscribe</b> &lt;type&gt; [component]\n` +
`Set what notifications you receive.\n` +
`Options: <code>incident</code>, <code>component</code>, <code>all</code>\n` +
`Types: <code>incident</code>, <code>component</code>, <code>all</code>\n` +
`Component filter: <code>/subscribe component api</code>\n` +
`Clear filter: <code>/subscribe component all</code>\n` +
`Example: <code>/subscribe incident</code>\n\n` +
`<b>/history</b> [count]\n` +
`Show recent incidents. Default: 5, max: 10.\n` +
@@ -46,7 +49,7 @@ export function registerInfoCommands(bot) {
if (args) {
const component = await fetchComponentByName(args);
if (!component) {
await ctx.reply(`Component "<code>${args}</code>" not found.`, { parse_mode: "HTML" });
await ctx.reply(`Component "<code>${escapeHtml(args)}</code>" not found.`, { parse_mode: "HTML" });
return;
}
await ctx.reply(formatComponentLine(component), { parse_mode: "HTML" });
@@ -66,7 +69,8 @@ export function registerInfoCommands(bot) {
{ parse_mode: "HTML", disable_web_page_preview: true }
);
}
} catch {
} catch (err) {
console.error("status command error:", err);
await ctx.reply("Unable to fetch status. Please try again later.");
}
});
@@ -87,7 +91,8 @@ export function registerInfoCommands(bot) {
`<a href="${STATUS_URL}/history">View full history</a>`,
{ parse_mode: "HTML", disable_web_page_preview: true }
);
} catch {
} catch (err) {
console.error("history command error:", err);
await ctx.reply("Unable to fetch incident history. Please try again later.");
}
});
@@ -111,7 +116,8 @@ export function registerInfoCommands(bot) {
`<a href="${STATUS_URL}">View uptime on status page</a>`,
{ parse_mode: "HTML", disable_web_page_preview: true }
);
} catch {
} catch (err) {
console.error("uptime command error:", err);
await ctx.reply("Unable to fetch uptime data. Please try again later.");
}
});

77
src/cron-status-check.js Normal file
View File

@@ -0,0 +1,77 @@
import { fetchSummary, humanizeStatus, escapeHtml } from "./status-fetcher.js";
import { getSubscribersByType } from "./kv-store.js";
const LAST_STATUS_KEY = "last-status";
/**
* Build a map of component name -> status from summary
*/
function buildStatusMap(summary) {
const map = {};
for (const c of summary.components) {
if (!c.group) map[c.name] = c.status;
}
return map;
}
/**
* Format a component status change detected by cron
*/
function formatChangeMessage(name, oldStatus, newStatus) {
return (
`<b>Component Update: ${escapeHtml(name)}</b>\n` +
`${humanizeStatus(oldStatus)} → <b>${humanizeStatus(newStatus)}</b>\n` +
`<i>(detected by status check)</i>`
);
}
/**
* CF Scheduled handler — polls status page, notifies on changes
*/
export async function handleScheduled(env) {
const kv = env.claude_status;
const queue = env["claude-status"];
let summary;
try {
summary = await fetchSummary();
} catch (err) {
console.error("Cron: failed to fetch status:", err);
return;
}
const currentMap = buildStatusMap(summary);
const stored = await kv.get(LAST_STATUS_KEY, "json");
const previousMap = stored?.components || {};
// Find changes (only if previous state exists for that component)
const changes = [];
for (const [name, status] of Object.entries(currentMap)) {
if (previousMap[name] && previousMap[name] !== status) {
changes.push({ name, oldStatus: previousMap[name], newStatus: status });
}
}
// Always update stored state (proves cron is running)
await kv.put(LAST_STATUS_KEY, JSON.stringify({
components: currentMap,
timestamp: new Date().toISOString(),
}));
if (changes.length === 0) return;
console.log(`Cron: ${changes.length} component change(s) detected`);
// Enqueue notifications for each change
for (const { name, oldStatus, newStatus } of changes) {
const html = formatChangeMessage(name, oldStatus, newStatus);
const subscribers = await getSubscribersByType(kv, "component", name);
const messages = subscribers.map(({ chatId, threadId }) => ({
body: { chatId, threadId, html },
}));
for (let i = 0; i < messages.length; i += 100) {
await queue.sendBatch(messages.slice(i, i + 100));
}
console.log(`Cron: enqueued ${messages.length} messages for ${name} change`);
}
}

View File

@@ -2,16 +2,30 @@ import { Hono } from "hono";
import { handleTelegramWebhook } from "./bot-commands.js";
import { handleStatuspageWebhook } from "./statuspage-webhook.js";
import { handleQueue } from "./queue-consumer.js";
import { setupBot } from "./bot-setup.js";
import { handleScheduled } from "./cron-status-check.js";
import { migrateFromSingleKey } from "./kv-store.js";
const app = new Hono();
app.get("/", (c) => c.text("Claude Status Bot is running"));
app.get("/webhook/setup/:secret", (c) => setupBot(c));
app.post("/webhook/telegram", (c) => handleTelegramWebhook(c));
app.post("/webhook/status/:secret", (c) => handleStatuspageWebhook(c));
// One-time migration route — remove after migration is confirmed
app.get("/migrate/:secret", async (c) => {
const secret = c.req.param("secret");
const encoder = new TextEncoder();
const a = encoder.encode(secret);
const b = encoder.encode(c.env.WEBHOOK_SECRET);
if (a.byteLength !== b.byteLength || !crypto.subtle.timingSafeEqual(a, b)) {
return c.text("Unauthorized", 401);
}
const count = await migrateFromSingleKey(c.env.claude_status);
return c.json({ migrated: count });
});
export default {
fetch: app.fetch,
queue: handleQueue,
scheduled: (event, env, ctx) => ctx.waitUntil(handleScheduled(env)),
};

View File

@@ -1,84 +1,164 @@
const KV_KEY = "subscribers";
const KEY_PREFIX = "sub:";
/**
* Build composite key: "chatId" or "chatId:threadId" for supergroup topics
* Build KV key: "sub:{chatId}" or "sub:{chatId}:{threadId}"
*/
export function buildSubscriberKey(chatId, threadId) {
return threadId != null ? `${chatId}:${threadId}` : `${chatId}`;
function buildKvKey(chatId, threadId) {
const suffix = threadId != null ? `${chatId}:${threadId}` : `${chatId}`;
return `${KEY_PREFIX}${suffix}`;
}
/**
* Parse composite key back into { chatId, threadId }
* Parse KV key back into { chatId, threadId }
* Key format: "sub:{chatId}" or "sub:{chatId}:{threadId}"
*/
export function parseSubscriberKey(key) {
const parts = key.split(":");
if (parts.length >= 2 && parts[0].startsWith("-")) {
// Supergroup IDs start with "-100", so key is "-100xxx:threadId"
// Find the last ":" — everything before is chatId, after is threadId
const lastColon = key.lastIndexOf(":");
function parseKvKey(kvKey) {
const raw = kvKey.slice(KEY_PREFIX.length);
const lastColon = raw.lastIndexOf(":");
// No colon or only negative sign prefix — no threadId
if (lastColon <= 0) {
return { chatId: raw, threadId: null };
}
// Check if the part after last colon is a valid threadId (numeric)
const possibleThread = raw.slice(lastColon + 1);
if (/^\d+$/.test(possibleThread)) {
return {
chatId: key.slice(0, lastColon),
threadId: parseInt(key.slice(lastColon + 1), 10),
chatId: raw.slice(0, lastColon),
threadId: parseInt(possibleThread, 10),
};
}
return { chatId: key, threadId: null };
return { chatId: raw, threadId: null };
}
/**
* Get all subscribers from KV
* List all subscriber KV keys with cursor pagination
*/
export async function getSubscribers(kv) {
const data = await kv.get(KV_KEY, "json");
return data || {};
async function listAllSubscriberKeys(kv) {
const keys = [];
let cursor = undefined;
do {
const result = await kv.list({ prefix: KEY_PREFIX, cursor });
keys.push(...result.keys);
cursor = result.list_complete ? undefined : result.cursor;
} while (cursor);
return keys;
}
/**
* Write all subscribers to KV
*/
async function setSubscribers(kv, data) {
await kv.put(KV_KEY, JSON.stringify(data));
}
/**
* Add subscriber with default types
* Add or re-subscribe a user with default types
*/
export async function addSubscriber(kv, chatId, threadId, types = ["incident", "component"]) {
const subs = await getSubscribers(kv);
const key = buildSubscriberKey(chatId, threadId);
subs[key] = { types };
await setSubscribers(kv, subs);
const key = buildKvKey(chatId, threadId);
const existing = await kv.get(key, "json");
const value = { types, components: existing?.components || [] };
await kv.put(key, JSON.stringify(value));
}
/**
* Remove subscriber
* Remove subscriber — atomic single delete
*/
export async function removeSubscriber(kv, chatId, threadId) {
const subs = await getSubscribers(kv);
const key = buildSubscriberKey(chatId, threadId);
delete subs[key];
await setSubscribers(kv, subs);
const key = buildKvKey(chatId, threadId);
await kv.delete(key);
}
/**
* Update subscriber's notification type preferences
*/
export async function updateSubscriberTypes(kv, chatId, threadId, types) {
const subs = await getSubscribers(kv);
const key = buildSubscriberKey(chatId, threadId);
if (!subs[key]) {
subs[key] = { types };
} else {
subs[key].types = types;
}
await setSubscribers(kv, subs);
const key = buildKvKey(chatId, threadId);
const existing = await kv.get(key, "json");
if (!existing) return false;
existing.types = types;
await kv.put(key, JSON.stringify(existing));
return true;
}
/**
* Get subscribers filtered by event type, returns [{ chatId, threadId }, ...]
* Update subscriber's component filter list
*/
export async function getSubscribersByType(kv, eventType) {
const subs = await getSubscribers(kv);
return Object.entries(subs)
.filter(([, val]) => val.types.includes(eventType))
.map(([key]) => parseSubscriberKey(key));
export async function updateSubscriberComponents(kv, chatId, threadId, components) {
const key = buildKvKey(chatId, threadId);
const existing = await kv.get(key, "json");
if (!existing) return false;
existing.components = components;
await kv.put(key, JSON.stringify(existing));
return true;
}
/**
* Get a single subscriber's data, or null if not subscribed
*/
export async function getSubscriber(kv, chatId, threadId) {
const key = buildKvKey(chatId, threadId);
return kv.get(key, "json");
}
/**
* Get subscribers filtered by event type and optional component name.
* Returns [{ chatId, threadId, ...value }, ...]
*/
export async function getSubscribersByType(kv, eventType, componentName = null) {
const keys = await listAllSubscriberKeys(kv);
const results = [];
for (const { name } of keys) {
const value = await kv.get(name, "json");
if (!value || !value.types.includes(eventType)) continue;
// Component-specific filtering
if (eventType === "component" && componentName && value.components?.length > 0) {
const match = value.components.some(
(c) => c.toLowerCase() === componentName.toLowerCase()
);
if (!match) continue;
}
const { chatId, threadId } = parseKvKey(name);
results.push({ chatId, threadId });
}
return results;
}
/**
* Get all subscribers with their full data
*/
export async function getAllSubscribers(kv) {
const keys = await listAllSubscriberKeys(kv);
const results = [];
for (const { name } of keys) {
const value = await kv.get(name, "json");
if (!value) continue;
const { chatId, threadId } = parseKvKey(name);
results.push({ chatId, threadId, ...value });
}
return results;
}
/**
* One-time migration from single-key "subscribers" to per-key format.
* Returns count of migrated entries.
*/
export async function migrateFromSingleKey(kv) {
const old = await kv.get("subscribers", "json");
if (!old) return 0;
const entries = Object.entries(old);
for (const [compositeKey, value] of entries) {
// Preserve components field if it exists, default to empty
const data = { types: value.types || [], components: value.components || [] };
await kv.put(`${KEY_PREFIX}${compositeKey}`, JSON.stringify(data));
}
// Verify migrated count before deleting old key
const migrated = await listAllSubscriberKeys(kv);
if (migrated.length >= entries.length) {
await kv.delete("subscribers");
console.log(`Migration complete: ${entries.length} subscribers migrated`);
} else {
console.error(`Migration verification failed: expected ${entries.length}, got ${migrated.length}`);
}
return entries.length;
}

View File

@@ -1,6 +1,5 @@
import { removeSubscriber } from "./kv-store.js";
const TELEGRAM_API = "https://api.telegram.org/bot";
import { telegramUrl } from "./telegram-api.js";
/**
* Process a batch of queued messages, sending each to Telegram.
@@ -12,6 +11,7 @@ export async function handleQueue(batch, env) {
// Defensive check for malformed messages
if (!chatId || !html) {
console.error("Queue: malformed message, skipping", msg.body);
msg.ack();
continue;
}
@@ -24,9 +24,9 @@ export async function handleQueue(batch, env) {
disable_web_page_preview: true,
};
// Send to specific supergroup topic if threadId present
if (threadId) payload.message_thread_id = threadId;
if (threadId != null) payload.message_thread_id = threadId;
const res = await fetch(`${TELEGRAM_API}${env.BOT_TOKEN}/sendMessage`, {
const res = await fetch(telegramUrl(env.BOT_TOKEN, "sendMessage"), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload),
@@ -36,17 +36,21 @@ export async function handleQueue(batch, env) {
msg.ack();
} else if (res.status === 403 || res.status === 400) {
// Bot blocked or chat not found — auto-remove subscriber
console.log(`Queue: removing subscriber ${chatId}:${threadId} (HTTP ${res.status})`);
await removeSubscriber(env.claude_status, chatId, threadId);
msg.ack();
} else if (res.status === 429) {
// Rate limited — let queue retry later
console.log("Queue: rate limited, retrying");
msg.retry();
} else {
// Unknown error — ack to avoid infinite retry
console.error(`Queue: unexpected HTTP ${res.status} for ${chatId}`);
msg.ack();
}
} catch {
} catch (err) {
// Network error — retry
console.error("Queue: network error, retrying", err);
msg.retry();
}
}

View File

@@ -1,6 +1,49 @@
import { getSubscribersByType } from "./kv-store.js";
import { humanizeStatus, escapeHtml } from "./status-fetcher.js";
/**
* Convert hex string to Uint8Array
*/
function hexToBytes(hex) {
const bytes = new Uint8Array(hex.length / 2);
for (let i = 0; i < hex.length; i += 2) {
bytes[i / 2] = parseInt(hex.substr(i, 2), 16);
}
return bytes;
}
/**
* Verify Statuspage HMAC-SHA256 signature
*/
async function verifyHmacSignature(request, hmacKey) {
if (!hmacKey) return false;
const signature = request.headers.get("X-Statuspage-Signature");
if (!signature) return false;
const body = await request.clone().arrayBuffer();
const key = await crypto.subtle.importKey(
"raw",
new TextEncoder().encode(hmacKey),
{ name: "HMAC", hash: "SHA-256" },
false,
["verify"]
);
const sigBytes = hexToBytes(signature);
return crypto.subtle.verify("HMAC", key, sigBytes, body);
}
/**
* Timing-safe string comparison
*/
async function timingSafeEqual(a, b) {
const encoder = new TextEncoder();
const bufA = encoder.encode(a);
const bufB = encoder.encode(b);
if (bufA.byteLength !== bufB.byteLength) return false;
return crypto.subtle.timingSafeEqual(bufA, bufB);
}
/**
* Format incident event as Telegram HTML message
*/
@@ -36,13 +79,13 @@ function formatComponentMessage(component, update) {
* Handle incoming Statuspage webhook
*/
export async function handleStatuspageWebhook(c) {
// Validate secret (timing-safe comparison)
const secret = c.req.param("secret");
const encoder = new TextEncoder();
const a = encoder.encode(secret);
const b = encoder.encode(c.env.WEBHOOK_SECRET);
if (a.byteLength !== b.byteLength || !crypto.subtle.timingSafeEqual(a, b)) {
return c.text("Unauthorized", 401);
// Try HMAC verification first, fall back to URL secret
const hmacValid = await verifyHmacSignature(c.req.raw, c.env.STATUSPAGE_HMAC_KEY);
if (!hmacValid) {
const secret = c.req.param("secret");
if (!await timingSafeEqual(secret, c.env.WEBHOOK_SECRET)) {
return c.text("Unauthorized", 401);
}
}
// Parse body
@@ -56,20 +99,23 @@ export async function handleStatuspageWebhook(c) {
const eventType = body?.meta?.event_type;
if (!eventType) return c.text("Bad Request", 400);
console.log(`Statuspage webhook: ${eventType}`);
// Determine category and format message
let category, html;
let category, html, componentName;
if (eventType.startsWith("incident.")) {
category = "incident";
html = formatIncidentMessage(body.incident);
} else if (eventType.startsWith("component.")) {
category = "component";
componentName = body.component?.name || null;
html = formatComponentMessage(body.component, body.component_update);
} else {
return c.text("Unknown event type", 400);
}
// Get filtered subscribers
const subscribers = await getSubscribersByType(c.env.claude_status, category);
// Get filtered subscribers (with component name filtering)
const subscribers = await getSubscribersByType(c.env.claude_status, category, componentName);
// Enqueue messages for fan-out via CF Queues (batch for performance)
const messages = subscribers.map(({ chatId, threadId }) => ({
@@ -79,5 +125,7 @@ export async function handleStatuspageWebhook(c) {
await c.env["claude-status"].sendBatch(messages.slice(i, i + 100));
}
console.log(`Enqueued ${messages.length} messages for ${category}${componentName ? `:${componentName}` : ""}`);
return c.text("OK", 200);
}

8
src/telegram-api.js Normal file
View File

@@ -0,0 +1,8 @@
/**
* Shared Telegram Bot API base URL and helper
*/
export const TELEGRAM_API = "https://api.telegram.org/bot";
export function telegramUrl(token, method) {
return `${TELEGRAM_API}${token}/${method}`;
}

View File

@@ -23,8 +23,12 @@
"max_retries": 3
}
]
},
"triggers": {
"crons": ["*/5 * * * *"]
}
// Secrets (set via `wrangler secret put`):
// BOT_TOKEN - Telegram bot token
// WEBHOOK_SECRET - Statuspage webhook URL secret
// STATUSPAGE_HMAC_KEY - HMAC key from Statuspage webhook settings (optional)
}