fix: harden webhook reliability, fix bugs, add test suite

- Statuspage webhook always returns 200 to prevent subscriber removal
- Fix parseKvKey returning string chatId instead of number
- Queue consumer retries on Telegram 5xx instead of acking (prevents message loss)
- Fix observability top-level enabled flag (false → true)
- Add defensive null checks for webhook payload body
- Cache Bot instance per isolate to avoid middleware rebuild per request
- Add vitest + @cloudflare/vitest-pool-workers with 31 tests
- Document DLQ and KV sharding as declined features
This commit is contained in:
2026-04-09 10:29:30 +07:00
parent bb8f4dcde8
commit 8c993df72b
15 changed files with 1680 additions and 57 deletions

View File

@@ -8,6 +8,13 @@ import {
} from "./kv-store.js";
import { fetchComponentByName, escapeHtml } from "./status-fetcher.js";
import { registerInfoCommands } from "./bot-info-commands.js";
/**
* Module-level KV reference, updated each request.
* Safe because CF Workers are single-threaded per isolate.
*/
let kv = null;
/**
* Extract chatId and threadId from grammY context
*/
@@ -19,11 +26,10 @@ function getChatTarget(ctx) {
}
/**
* Handle incoming Telegram webhook via grammY
* Create Bot with all commands registered. Called once per isolate.
*/
export async function handleTelegramWebhook(c) {
const bot = new Bot(c.env.BOT_TOKEN);
const kv = c.env.claude_status;
function createBot(token) {
const bot = new Bot(token);
bot.command("start", async (ctx) => {
const { chatId, threadId } = getChatTarget(ctx);
@@ -140,6 +146,29 @@ export async function handleTelegramWebhook(c) {
);
});
const handler = webhookCallback(bot, "cloudflare-mod");
return handler(c.req.raw);
return bot;
}
/**
* Cached Bot instance — avoids rebuilding middleware chain on every request.
* CF Workers reuse isolates, so module-level state persists across requests.
*/
let cachedBot = null;
let cachedToken = null;
let cachedHandler = null;
/**
* Handle incoming Telegram webhook via grammY
*/
export async function handleTelegramWebhook(c) {
// Update module-level KV ref (same binding across requests, but kept explicit)
kv = c.env.claude_status;
if (!cachedBot || cachedToken !== c.env.BOT_TOKEN) {
cachedBot = createBot(c.env.BOT_TOKEN);
cachedToken = c.env.BOT_TOKEN;
cachedHandler = webhookCallback(cachedBot, "cloudflare-mod");
}
return cachedHandler(c.req.raw);
}

View File

@@ -17,7 +17,7 @@ function parseKvKey(kvKey) {
const lastColon = raw.lastIndexOf(":");
// No colon or only negative sign prefix — no threadId
if (lastColon <= 0) {
return { chatId: raw, threadId: null };
return { chatId: Number(raw), threadId: null };
}
// Check if the part after last colon is a valid threadId (numeric)
const possibleThread = raw.slice(lastColon + 1);

View File

@@ -46,6 +46,10 @@ export async function handleQueue(batch, env) {
console.log(`Queue: rate limited for ${chatId}, Retry-After: ${retryAfter ?? "unknown"}`);
retried++;
msg.retry();
} else if (res.status >= 500) {
console.error(`Queue: Telegram 5xx (${res.status}) for ${chatId}, retrying`);
retried++;
msg.retry();
} else {
console.error(`Queue: unexpected HTTP ${res.status} for ${chatId}`);
failed++;

View File

@@ -34,53 +34,73 @@ function formatComponentMessage(component, update) {
}
/**
* Handle incoming Statuspage webhook
* Handle incoming Statuspage webhook.
* CRITICAL: Always return 200 — Statuspage removes subscriber webhooks on non-2xx responses.
*/
export async function handleStatuspageWebhook(c) {
// Validate URL secret (timing-safe)
const secret = c.req.param("secret");
if (!await timingSafeEqual(secret, c.env.WEBHOOK_SECRET)) {
return c.text("Unauthorized", 401);
}
// Parse body
let body;
try {
body = await c.req.json();
} catch {
return c.text("Bad Request", 400);
// Validate URL secret (timing-safe)
const secret = c.req.param("secret");
if (!await timingSafeEqual(secret, c.env.WEBHOOK_SECRET)) {
console.error("Statuspage webhook: invalid secret");
return c.text("OK", 200);
}
// Parse body
let body;
try {
body = await c.req.json();
} catch {
console.error("Statuspage webhook: invalid JSON body");
return c.text("OK", 200);
}
const eventType = body?.meta?.event_type;
if (!eventType) {
console.error("Statuspage webhook: missing event_type");
return c.text("OK", 200);
}
console.log(`Statuspage webhook: ${eventType}`);
// Determine category and format message
let category, html, componentName;
if (eventType.startsWith("incident.")) {
if (!body.incident) {
console.error("Statuspage webhook: incident event missing incident data");
return c.text("OK", 200);
}
category = "incident";
html = formatIncidentMessage(body.incident);
} else if (eventType.startsWith("component.")) {
if (!body.component) {
console.error("Statuspage webhook: component event missing component data");
return c.text("OK", 200);
}
category = "component";
componentName = body.component.name || null;
html = formatComponentMessage(body.component, body.component_update);
} else {
console.error(`Statuspage webhook: unknown event type ${eventType}`);
return c.text("OK", 200);
}
// Get filtered subscribers (with component name filtering)
const subscribers = await getSubscribersByType(c.env.claude_status, category, componentName);
// Enqueue messages for fan-out via CF Queues (batch for performance)
const messages = subscribers.map(({ chatId, threadId }) => ({
body: { chatId, threadId, html },
}));
for (let i = 0; i < messages.length; i += 100) {
await c.env["claude-status"].sendBatch(messages.slice(i, i + 100));
}
console.log(`Enqueued ${messages.length} messages for ${category}${componentName ? `:${componentName}` : ""}`);
return c.text("OK", 200);
} catch (err) {
// Catch-all: log error but still return 200 to prevent Statuspage from removing us
console.error("Statuspage webhook: unexpected error", err);
return c.text("OK", 200);
}
const eventType = body?.meta?.event_type;
if (!eventType) return c.text("Bad Request", 400);
console.log(`Statuspage webhook: ${eventType}`);
// Determine category and format message
let category, html, componentName;
if (eventType.startsWith("incident.")) {
category = "incident";
html = formatIncidentMessage(body.incident);
} else if (eventType.startsWith("component.")) {
category = "component";
componentName = body.component?.name || null;
html = formatComponentMessage(body.component, body.component_update);
} else {
return c.text("Unknown event type", 400);
}
// Get filtered subscribers (with component name filtering)
const subscribers = await getSubscribersByType(c.env.claude_status, category, componentName);
// Enqueue messages for fan-out via CF Queues (batch for performance)
const messages = subscribers.map(({ chatId, threadId }) => ({
body: { chatId, threadId, html },
}));
for (let i = 0; i < messages.length; i += 100) {
await c.env["claude-status"].sendBatch(messages.slice(i, i + 100));
}
console.log(`Enqueued ${messages.length} messages for ${category}${componentName ? `:${componentName}` : ""}`);
return c.text("OK", 200);
}