mirror of
https://github.com/tiennm99/claude-status-webhook.git
synced 2026-04-18 05:20:41 +00:00
Merge remote-tracking branch 'origin/dev'
This commit is contained in:
@@ -3,11 +3,11 @@ import {
|
||||
addSubscriber,
|
||||
removeSubscriber,
|
||||
updateSubscriberTypes,
|
||||
getSubscribers,
|
||||
buildSubscriberKey,
|
||||
updateSubscriberComponents,
|
||||
getSubscriber,
|
||||
} from "./kv-store.js";
|
||||
import { fetchComponentByName, escapeHtml } from "./status-fetcher.js";
|
||||
import { registerInfoCommands } from "./bot-info-commands.js";
|
||||
|
||||
/**
|
||||
* Extract chatId and threadId from grammY context
|
||||
*/
|
||||
@@ -46,7 +46,50 @@ export async function handleTelegramWebhook(c) {
|
||||
|
||||
bot.command("subscribe", async (ctx) => {
|
||||
const { chatId, threadId } = getChatTarget(ctx);
|
||||
const arg = ctx.match?.trim().toLowerCase();
|
||||
const args = ctx.match?.trim().toLowerCase().split(/\s+/) || [];
|
||||
const arg = args[0];
|
||||
|
||||
// Handle "/subscribe component <name>" or "/subscribe component all"
|
||||
if (arg === "component" && args.length > 1) {
|
||||
const componentArg = args.slice(1).join(" ");
|
||||
const sub = await getSubscriber(kv, chatId, threadId);
|
||||
if (!sub) {
|
||||
await ctx.reply("Not subscribed yet. Use /start first.", { parse_mode: "HTML" });
|
||||
return;
|
||||
}
|
||||
if (componentArg === "all") {
|
||||
await updateSubscriberComponents(kv, chatId, threadId, []);
|
||||
await ctx.reply("Component filter cleared — receiving all component updates.", {
|
||||
parse_mode: "HTML",
|
||||
});
|
||||
return;
|
||||
}
|
||||
// Validate component name against live API
|
||||
const component = await fetchComponentByName(componentArg);
|
||||
if (!component) {
|
||||
await ctx.reply(`Component "<code>${escapeHtml(componentArg)}</code>" not found.`, {
|
||||
parse_mode: "HTML",
|
||||
});
|
||||
return;
|
||||
}
|
||||
// Add to component filter (deduplicate)
|
||||
const components = sub.components || [];
|
||||
if (!components.some((c) => c.toLowerCase() === component.name.toLowerCase())) {
|
||||
components.push(component.name);
|
||||
}
|
||||
await updateSubscriberComponents(kv, chatId, threadId, components);
|
||||
// Ensure "component" is in types
|
||||
if (!sub.types.includes("component")) {
|
||||
sub.types.push("component");
|
||||
await updateSubscriberTypes(kv, chatId, threadId, sub.types);
|
||||
}
|
||||
await ctx.reply(
|
||||
`Subscribed to component: <code>${escapeHtml(component.name)}</code>\n` +
|
||||
`Active filters: <code>${components.join(", ")}</code>`,
|
||||
{ parse_mode: "HTML" }
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const validTypes = {
|
||||
incident: ["incident"],
|
||||
@@ -55,19 +98,26 @@ export async function handleTelegramWebhook(c) {
|
||||
};
|
||||
|
||||
if (!arg || !validTypes[arg]) {
|
||||
const key = buildSubscriberKey(chatId, threadId);
|
||||
const subs = await getSubscribers(kv);
|
||||
const current = subs[key]?.types?.join(", ") || "none (use /start first)";
|
||||
const sub = await getSubscriber(kv, chatId, threadId);
|
||||
const current = sub?.types?.join(", ") || "none (use /start first)";
|
||||
const compFilter = sub?.components?.length ? sub.components.join(", ") : "all";
|
||||
await ctx.reply(
|
||||
"<b>Usage:</b> /subscribe <type>\n\n" +
|
||||
"<b>Usage:</b> /subscribe <type> [component]\n\n" +
|
||||
"Types: <code>incident</code>, <code>component</code>, <code>all</code>\n" +
|
||||
`\nCurrent: <code>${current}</code>`,
|
||||
"Component filter: <code>/subscribe component api</code>\n" +
|
||||
"Clear filter: <code>/subscribe component all</code>\n" +
|
||||
`\nCurrent types: <code>${current}</code>\n` +
|
||||
`Components: <code>${compFilter}</code>`,
|
||||
{ parse_mode: "HTML" }
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await updateSubscriberTypes(kv, chatId, threadId, validTypes[arg]);
|
||||
const updated = await updateSubscriberTypes(kv, chatId, threadId, validTypes[arg]);
|
||||
if (!updated) {
|
||||
await ctx.reply("Not subscribed yet. Use /start first.", { parse_mode: "HTML" });
|
||||
return;
|
||||
}
|
||||
await ctx.reply(`Subscription updated: <code>${validTypes[arg].join(", ")}</code>`, {
|
||||
parse_mode: "HTML",
|
||||
});
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import {
|
||||
escapeHtml,
|
||||
fetchComponentByName,
|
||||
fetchSummary,
|
||||
fetchIncidents,
|
||||
@@ -26,9 +27,11 @@ export function registerInfoCommands(bot) {
|
||||
`Show current status of all components.\n` +
|
||||
`Add a component name for a specific check.\n` +
|
||||
`Example: <code>/status api</code>\n\n` +
|
||||
`<b>/subscribe</b> <type>\n` +
|
||||
`<b>/subscribe</b> <type> [component]\n` +
|
||||
`Set what notifications you receive.\n` +
|
||||
`Options: <code>incident</code>, <code>component</code>, <code>all</code>\n` +
|
||||
`Types: <code>incident</code>, <code>component</code>, <code>all</code>\n` +
|
||||
`Component filter: <code>/subscribe component api</code>\n` +
|
||||
`Clear filter: <code>/subscribe component all</code>\n` +
|
||||
`Example: <code>/subscribe incident</code>\n\n` +
|
||||
`<b>/history</b> [count]\n` +
|
||||
`Show recent incidents. Default: 5, max: 10.\n` +
|
||||
@@ -46,7 +49,7 @@ export function registerInfoCommands(bot) {
|
||||
if (args) {
|
||||
const component = await fetchComponentByName(args);
|
||||
if (!component) {
|
||||
await ctx.reply(`Component "<code>${args}</code>" not found.`, { parse_mode: "HTML" });
|
||||
await ctx.reply(`Component "<code>${escapeHtml(args)}</code>" not found.`, { parse_mode: "HTML" });
|
||||
return;
|
||||
}
|
||||
await ctx.reply(formatComponentLine(component), { parse_mode: "HTML" });
|
||||
@@ -66,7 +69,8 @@ export function registerInfoCommands(bot) {
|
||||
{ parse_mode: "HTML", disable_web_page_preview: true }
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
} catch (err) {
|
||||
console.error("status command error:", err);
|
||||
await ctx.reply("Unable to fetch status. Please try again later.");
|
||||
}
|
||||
});
|
||||
@@ -87,7 +91,8 @@ export function registerInfoCommands(bot) {
|
||||
`<a href="${STATUS_URL}/history">View full history</a>`,
|
||||
{ parse_mode: "HTML", disable_web_page_preview: true }
|
||||
);
|
||||
} catch {
|
||||
} catch (err) {
|
||||
console.error("history command error:", err);
|
||||
await ctx.reply("Unable to fetch incident history. Please try again later.");
|
||||
}
|
||||
});
|
||||
@@ -111,7 +116,8 @@ export function registerInfoCommands(bot) {
|
||||
`<a href="${STATUS_URL}">View uptime on status page</a>`,
|
||||
{ parse_mode: "HTML", disable_web_page_preview: true }
|
||||
);
|
||||
} catch {
|
||||
} catch (err) {
|
||||
console.error("uptime command error:", err);
|
||||
await ctx.reply("Unable to fetch uptime data. Please try again later.");
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
const TELEGRAM_API = "https://api.telegram.org/bot";
|
||||
|
||||
const BOT_COMMANDS = [
|
||||
{ command: "help", description: "Detailed command guide" },
|
||||
{ command: "start", description: "Subscribe to status notifications" },
|
||||
{ command: "stop", description: "Unsubscribe from notifications" },
|
||||
{ command: "status", description: "Current system status" },
|
||||
{ command: "subscribe", description: "Set notification preferences" },
|
||||
{ command: "history", description: "Recent incidents" },
|
||||
{ command: "uptime", description: "Component health overview" },
|
||||
];
|
||||
|
||||
/**
|
||||
* One-time setup: register bot commands and set Telegram webhook.
|
||||
* GET /webhook/setup/:secret
|
||||
*/
|
||||
export async function setupBot(c) {
|
||||
const secret = c.req.param("secret");
|
||||
if (secret !== c.env.WEBHOOK_SECRET) {
|
||||
return c.text("Unauthorized", 401);
|
||||
}
|
||||
|
||||
const token = c.env.BOT_TOKEN;
|
||||
const workerUrl = new URL(c.req.url).origin;
|
||||
|
||||
// Set webhook URL
|
||||
const webhookRes = await fetch(`${TELEGRAM_API}${token}/setWebhook`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ url: `${workerUrl}/webhook/telegram` }),
|
||||
});
|
||||
const webhookData = await webhookRes.json();
|
||||
|
||||
// Register bot commands
|
||||
const commandsRes = await fetch(`${TELEGRAM_API}${token}/setMyCommands`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ commands: BOT_COMMANDS }),
|
||||
});
|
||||
const commandsData = await commandsRes.json();
|
||||
|
||||
return c.json({
|
||||
webhook: webhookData,
|
||||
commands: commandsData,
|
||||
});
|
||||
}
|
||||
76
src/cron-status-check.js
Normal file
76
src/cron-status-check.js
Normal file
@@ -0,0 +1,76 @@
|
||||
import { fetchSummary, humanizeStatus, escapeHtml } from "./status-fetcher.js";
|
||||
import { getSubscribersByType } from "./kv-store.js";
|
||||
const LAST_STATUS_KEY = "last-status";
|
||||
|
||||
/**
|
||||
* Build a map of component name -> status from summary
|
||||
*/
|
||||
function buildStatusMap(summary) {
|
||||
const map = {};
|
||||
for (const c of summary.components) {
|
||||
if (!c.group) map[c.name] = c.status;
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format a component status change detected by cron
|
||||
*/
|
||||
function formatChangeMessage(name, oldStatus, newStatus) {
|
||||
return (
|
||||
`<b>Component Update: ${escapeHtml(name)}</b>\n` +
|
||||
`${humanizeStatus(oldStatus)} → <b>${humanizeStatus(newStatus)}</b>\n` +
|
||||
`<i>(detected by status check)</i>`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* CF Scheduled handler — polls status page, notifies on changes
|
||||
*/
|
||||
export async function handleScheduled(env) {
|
||||
const kv = env.claude_status;
|
||||
const queue = env["claude-status"];
|
||||
|
||||
let summary;
|
||||
try {
|
||||
summary = await fetchSummary();
|
||||
} catch (err) {
|
||||
console.error("Cron: failed to fetch status:", err);
|
||||
return;
|
||||
}
|
||||
|
||||
const currentMap = buildStatusMap(summary);
|
||||
const stored = await kv.get(LAST_STATUS_KEY, "json");
|
||||
const previousMap = stored?.components || {};
|
||||
|
||||
// Find changes (only if previous state exists for that component)
|
||||
const changes = [];
|
||||
for (const [name, status] of Object.entries(currentMap)) {
|
||||
if (previousMap[name] && previousMap[name] !== status) {
|
||||
changes.push({ name, oldStatus: previousMap[name], newStatus: status });
|
||||
}
|
||||
}
|
||||
|
||||
// Always update stored state (proves cron is running)
|
||||
await kv.put(LAST_STATUS_KEY, JSON.stringify({
|
||||
components: currentMap,
|
||||
timestamp: new Date().toISOString(),
|
||||
}));
|
||||
|
||||
if (changes.length === 0) return;
|
||||
|
||||
console.log(`Cron: ${changes.length} component change(s) detected`);
|
||||
|
||||
// Enqueue notifications for each change
|
||||
for (const { name, oldStatus, newStatus } of changes) {
|
||||
const html = formatChangeMessage(name, oldStatus, newStatus);
|
||||
const subscribers = await getSubscribersByType(kv, "component", name);
|
||||
const messages = subscribers.map(({ chatId, threadId }) => ({
|
||||
body: { chatId, threadId, html },
|
||||
}));
|
||||
for (let i = 0; i < messages.length; i += 100) {
|
||||
await queue.sendBatch(messages.slice(i, i + 100));
|
||||
}
|
||||
console.log(`Cron: enqueued ${messages.length} messages for ${name} change`);
|
||||
}
|
||||
}
|
||||
14
src/index.js
14
src/index.js
@@ -2,7 +2,8 @@ import { Hono } from "hono";
|
||||
import { handleTelegramWebhook } from "./bot-commands.js";
|
||||
import { handleStatuspageWebhook } from "./statuspage-webhook.js";
|
||||
import { handleQueue } from "./queue-consumer.js";
|
||||
import { setupBot } from "./bot-setup.js";
|
||||
import { migrateFromSingleKey } from "./kv-store.js";
|
||||
import { timingSafeEqual } from "./crypto-utils.js";
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
@@ -20,10 +21,19 @@ app.use("*", async (c, next) => {
|
||||
});
|
||||
|
||||
app.get("/", (c) => c.text("Claude Status Bot is running"));
|
||||
app.get("/webhook/setup/:secret", (c) => setupBot(c));
|
||||
app.post("/webhook/telegram", (c) => handleTelegramWebhook(c));
|
||||
app.post("/webhook/status/:secret", (c) => handleStatuspageWebhook(c));
|
||||
|
||||
// One-time migration route — remove after migration is confirmed
|
||||
app.post("/migrate/:secret", async (c) => {
|
||||
const secret = c.req.param("secret");
|
||||
if (!await timingSafeEqual(secret, c.env.WEBHOOK_SECRET)) {
|
||||
return c.text("Unauthorized", 401);
|
||||
}
|
||||
const count = await migrateFromSingleKey(c.env.claude_status);
|
||||
return c.json({ migrated: count });
|
||||
});
|
||||
|
||||
export default {
|
||||
fetch: app.fetch,
|
||||
queue: handleQueue,
|
||||
|
||||
178
src/kv-store.js
178
src/kv-store.js
@@ -1,84 +1,164 @@
|
||||
const KV_KEY = "subscribers";
|
||||
const KEY_PREFIX = "sub:";
|
||||
|
||||
/**
|
||||
* Build composite key: "chatId" or "chatId:threadId" for supergroup topics
|
||||
* Build KV key: "sub:{chatId}" or "sub:{chatId}:{threadId}"
|
||||
*/
|
||||
export function buildSubscriberKey(chatId, threadId) {
|
||||
return threadId != null ? `${chatId}:${threadId}` : `${chatId}`;
|
||||
function buildKvKey(chatId, threadId) {
|
||||
const suffix = threadId != null ? `${chatId}:${threadId}` : `${chatId}`;
|
||||
return `${KEY_PREFIX}${suffix}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse composite key back into { chatId, threadId }
|
||||
* Parse KV key back into { chatId, threadId }
|
||||
* Key format: "sub:{chatId}" or "sub:{chatId}:{threadId}"
|
||||
*/
|
||||
export function parseSubscriberKey(key) {
|
||||
const parts = key.split(":");
|
||||
if (parts.length >= 2 && parts[0].startsWith("-")) {
|
||||
// Supergroup IDs start with "-100", so key is "-100xxx:threadId"
|
||||
// Find the last ":" — everything before is chatId, after is threadId
|
||||
const lastColon = key.lastIndexOf(":");
|
||||
function parseKvKey(kvKey) {
|
||||
const raw = kvKey.slice(KEY_PREFIX.length);
|
||||
const lastColon = raw.lastIndexOf(":");
|
||||
// No colon or only negative sign prefix — no threadId
|
||||
if (lastColon <= 0) {
|
||||
return { chatId: raw, threadId: null };
|
||||
}
|
||||
// Check if the part after last colon is a valid threadId (numeric)
|
||||
const possibleThread = raw.slice(lastColon + 1);
|
||||
if (/^\d+$/.test(possibleThread)) {
|
||||
return {
|
||||
chatId: key.slice(0, lastColon),
|
||||
threadId: parseInt(key.slice(lastColon + 1), 10),
|
||||
chatId: raw.slice(0, lastColon),
|
||||
threadId: parseInt(possibleThread, 10),
|
||||
};
|
||||
}
|
||||
return { chatId: key, threadId: null };
|
||||
return { chatId: raw, threadId: null };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all subscribers from KV
|
||||
* List all subscriber KV keys with cursor pagination
|
||||
*/
|
||||
export async function getSubscribers(kv) {
|
||||
const data = await kv.get(KV_KEY, "json");
|
||||
return data || {};
|
||||
async function listAllSubscriberKeys(kv) {
|
||||
const keys = [];
|
||||
let cursor = undefined;
|
||||
do {
|
||||
const result = await kv.list({ prefix: KEY_PREFIX, cursor });
|
||||
keys.push(...result.keys);
|
||||
cursor = result.list_complete ? undefined : result.cursor;
|
||||
} while (cursor);
|
||||
return keys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write all subscribers to KV
|
||||
*/
|
||||
async function setSubscribers(kv, data) {
|
||||
await kv.put(KV_KEY, JSON.stringify(data));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add subscriber with default types
|
||||
* Add or re-subscribe a user with default types
|
||||
*/
|
||||
export async function addSubscriber(kv, chatId, threadId, types = ["incident", "component"]) {
|
||||
const subs = await getSubscribers(kv);
|
||||
const key = buildSubscriberKey(chatId, threadId);
|
||||
subs[key] = { types };
|
||||
await setSubscribers(kv, subs);
|
||||
const key = buildKvKey(chatId, threadId);
|
||||
const existing = await kv.get(key, "json");
|
||||
const value = { types, components: existing?.components || [] };
|
||||
await kv.put(key, JSON.stringify(value));
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove subscriber
|
||||
* Remove subscriber — atomic single delete
|
||||
*/
|
||||
export async function removeSubscriber(kv, chatId, threadId) {
|
||||
const subs = await getSubscribers(kv);
|
||||
const key = buildSubscriberKey(chatId, threadId);
|
||||
delete subs[key];
|
||||
await setSubscribers(kv, subs);
|
||||
const key = buildKvKey(chatId, threadId);
|
||||
await kv.delete(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update subscriber's notification type preferences
|
||||
*/
|
||||
export async function updateSubscriberTypes(kv, chatId, threadId, types) {
|
||||
const subs = await getSubscribers(kv);
|
||||
const key = buildSubscriberKey(chatId, threadId);
|
||||
if (!subs[key]) {
|
||||
subs[key] = { types };
|
||||
} else {
|
||||
subs[key].types = types;
|
||||
}
|
||||
await setSubscribers(kv, subs);
|
||||
const key = buildKvKey(chatId, threadId);
|
||||
const existing = await kv.get(key, "json");
|
||||
if (!existing) return false;
|
||||
existing.types = types;
|
||||
await kv.put(key, JSON.stringify(existing));
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get subscribers filtered by event type, returns [{ chatId, threadId }, ...]
|
||||
* Update subscriber's component filter list
|
||||
*/
|
||||
export async function getSubscribersByType(kv, eventType) {
|
||||
const subs = await getSubscribers(kv);
|
||||
return Object.entries(subs)
|
||||
.filter(([, val]) => val.types.includes(eventType))
|
||||
.map(([key]) => parseSubscriberKey(key));
|
||||
export async function updateSubscriberComponents(kv, chatId, threadId, components) {
|
||||
const key = buildKvKey(chatId, threadId);
|
||||
const existing = await kv.get(key, "json");
|
||||
if (!existing) return false;
|
||||
existing.components = components;
|
||||
await kv.put(key, JSON.stringify(existing));
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a single subscriber's data, or null if not subscribed
|
||||
*/
|
||||
export async function getSubscriber(kv, chatId, threadId) {
|
||||
const key = buildKvKey(chatId, threadId);
|
||||
return kv.get(key, "json");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get subscribers filtered by event type and optional component name.
|
||||
* Returns [{ chatId, threadId, ...value }, ...]
|
||||
*/
|
||||
export async function getSubscribersByType(kv, eventType, componentName = null) {
|
||||
const keys = await listAllSubscriberKeys(kv);
|
||||
const results = [];
|
||||
|
||||
for (const { name } of keys) {
|
||||
const value = await kv.get(name, "json");
|
||||
if (!value || !value.types.includes(eventType)) continue;
|
||||
|
||||
// Component-specific filtering
|
||||
if (eventType === "component" && componentName && value.components?.length > 0) {
|
||||
const match = value.components.some(
|
||||
(c) => c.toLowerCase() === componentName.toLowerCase()
|
||||
);
|
||||
if (!match) continue;
|
||||
}
|
||||
|
||||
const { chatId, threadId } = parseKvKey(name);
|
||||
results.push({ chatId, threadId });
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all subscribers with their full data
|
||||
*/
|
||||
export async function getAllSubscribers(kv) {
|
||||
const keys = await listAllSubscriberKeys(kv);
|
||||
const results = [];
|
||||
for (const { name } of keys) {
|
||||
const value = await kv.get(name, "json");
|
||||
if (!value) continue;
|
||||
const { chatId, threadId } = parseKvKey(name);
|
||||
results.push({ chatId, threadId, ...value });
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* One-time migration from single-key "subscribers" to per-key format.
|
||||
* Returns count of migrated entries.
|
||||
*/
|
||||
export async function migrateFromSingleKey(kv) {
|
||||
const old = await kv.get("subscribers", "json");
|
||||
if (!old) return 0;
|
||||
|
||||
const entries = Object.entries(old);
|
||||
for (const [compositeKey, value] of entries) {
|
||||
// Preserve components field if it exists, default to empty
|
||||
const data = { types: value.types || [], components: value.components || [] };
|
||||
await kv.put(`${KEY_PREFIX}${compositeKey}`, JSON.stringify(data));
|
||||
}
|
||||
|
||||
// Verify migrated count before deleting old key
|
||||
const migrated = await listAllSubscriberKeys(kv);
|
||||
if (migrated.length >= entries.length) {
|
||||
await kv.delete("subscribers");
|
||||
console.log(`Migration complete: ${entries.length} subscribers migrated`);
|
||||
} else {
|
||||
console.error(`Migration verification failed: expected ${entries.length}, got ${migrated.length}`);
|
||||
}
|
||||
|
||||
return entries.length;
|
||||
}
|
||||
|
||||
@@ -1,17 +1,18 @@
|
||||
import { removeSubscriber } from "./kv-store.js";
|
||||
|
||||
const TELEGRAM_API = "https://api.telegram.org/bot";
|
||||
|
||||
import { telegramUrl } from "./telegram-api.js";
|
||||
/**
|
||||
* Process a batch of queued messages, sending each to Telegram.
|
||||
* Handles rate limits (429 → retry), blocked bots (403/400 → remove subscriber).
|
||||
*/
|
||||
export async function handleQueue(batch, env) {
|
||||
let sent = 0, failed = 0, retried = 0, removed = 0;
|
||||
|
||||
for (const msg of batch.messages) {
|
||||
const { chatId, threadId, html } = msg.body;
|
||||
|
||||
// Defensive check for malformed messages
|
||||
if (!chatId || !html) {
|
||||
console.error("Queue: malformed message, skipping", msg.body);
|
||||
msg.ack();
|
||||
continue;
|
||||
}
|
||||
@@ -24,30 +25,39 @@ export async function handleQueue(batch, env) {
|
||||
disable_web_page_preview: true,
|
||||
};
|
||||
// Send to specific supergroup topic if threadId present
|
||||
if (threadId) payload.message_thread_id = threadId;
|
||||
if (threadId != null) payload.message_thread_id = threadId;
|
||||
|
||||
const res = await fetch(`${TELEGRAM_API}${env.BOT_TOKEN}/sendMessage`, {
|
||||
const res = await fetch(telegramUrl(env.BOT_TOKEN, "sendMessage"), {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(payload),
|
||||
});
|
||||
|
||||
if (res.ok) {
|
||||
sent++;
|
||||
msg.ack();
|
||||
} else if (res.status === 403 || res.status === 400) {
|
||||
// Bot blocked or chat not found — auto-remove subscriber
|
||||
console.log(`Queue: removing subscriber ${chatId}:${threadId} (HTTP ${res.status})`);
|
||||
await removeSubscriber(env.claude_status, chatId, threadId);
|
||||
removed++;
|
||||
msg.ack();
|
||||
} else if (res.status === 429) {
|
||||
// Rate limited — let queue retry later
|
||||
console.log("Queue: rate limited, retrying");
|
||||
retried++;
|
||||
msg.retry();
|
||||
} else {
|
||||
// Unknown error — ack to avoid infinite retry
|
||||
console.error(`Queue: unexpected HTTP ${res.status} for ${chatId}`);
|
||||
failed++;
|
||||
msg.ack();
|
||||
}
|
||||
} catch {
|
||||
// Network error — retry
|
||||
} catch (err) {
|
||||
console.error("Queue: network error, retrying", err);
|
||||
retried++;
|
||||
msg.retry();
|
||||
}
|
||||
}
|
||||
|
||||
if (sent || failed || retried || removed) {
|
||||
console.log(`Queue batch: sent=${sent} failed=${failed} retried=${retried} removed=${removed}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,15 @@
|
||||
import { getSubscribersByType } from "./kv-store.js";
|
||||
import { humanizeStatus, escapeHtml } from "./status-fetcher.js";
|
||||
/**
|
||||
* Timing-safe string comparison
|
||||
*/
|
||||
async function timingSafeEqual(a, b) {
|
||||
const encoder = new TextEncoder();
|
||||
const bufA = encoder.encode(a);
|
||||
const bufB = encoder.encode(b);
|
||||
if (bufA.byteLength !== bufB.byteLength) return false;
|
||||
return crypto.subtle.timingSafeEqual(bufA, bufB);
|
||||
}
|
||||
|
||||
/**
|
||||
* Format incident event as Telegram HTML message
|
||||
@@ -36,12 +46,9 @@ function formatComponentMessage(component, update) {
|
||||
* Handle incoming Statuspage webhook
|
||||
*/
|
||||
export async function handleStatuspageWebhook(c) {
|
||||
// Validate secret (timing-safe comparison)
|
||||
// Validate URL secret (timing-safe)
|
||||
const secret = c.req.param("secret");
|
||||
const encoder = new TextEncoder();
|
||||
const a = encoder.encode(secret);
|
||||
const b = encoder.encode(c.env.WEBHOOK_SECRET);
|
||||
if (a.byteLength !== b.byteLength || !crypto.subtle.timingSafeEqual(a, b)) {
|
||||
if (!await timingSafeEqual(secret, c.env.WEBHOOK_SECRET)) {
|
||||
return c.text("Unauthorized", 401);
|
||||
}
|
||||
|
||||
@@ -56,20 +63,23 @@ export async function handleStatuspageWebhook(c) {
|
||||
const eventType = body?.meta?.event_type;
|
||||
if (!eventType) return c.text("Bad Request", 400);
|
||||
|
||||
console.log(`Statuspage webhook: ${eventType}`);
|
||||
|
||||
// Determine category and format message
|
||||
let category, html;
|
||||
let category, html, componentName;
|
||||
if (eventType.startsWith("incident.")) {
|
||||
category = "incident";
|
||||
html = formatIncidentMessage(body.incident);
|
||||
} else if (eventType.startsWith("component.")) {
|
||||
category = "component";
|
||||
componentName = body.component?.name || null;
|
||||
html = formatComponentMessage(body.component, body.component_update);
|
||||
} else {
|
||||
return c.text("Unknown event type", 400);
|
||||
}
|
||||
|
||||
// Get filtered subscribers
|
||||
const subscribers = await getSubscribersByType(c.env.claude_status, category);
|
||||
// Get filtered subscribers (with component name filtering)
|
||||
const subscribers = await getSubscribersByType(c.env.claude_status, category, componentName);
|
||||
|
||||
// Enqueue messages for fan-out via CF Queues (batch for performance)
|
||||
const messages = subscribers.map(({ chatId, threadId }) => ({
|
||||
@@ -79,5 +89,7 @@ export async function handleStatuspageWebhook(c) {
|
||||
await c.env["claude-status"].sendBatch(messages.slice(i, i + 100));
|
||||
}
|
||||
|
||||
console.log(`Enqueued ${messages.length} messages for ${category}${componentName ? `:${componentName}` : ""}`);
|
||||
|
||||
return c.text("OK", 200);
|
||||
}
|
||||
|
||||
8
src/telegram-api.js
Normal file
8
src/telegram-api.js
Normal file
@@ -0,0 +1,8 @@
|
||||
/**
|
||||
* Shared Telegram Bot API base URL and helper
|
||||
*/
|
||||
export const TELEGRAM_API = "https://api.telegram.org/bot";
|
||||
|
||||
export function telegramUrl(token, method) {
|
||||
return `${TELEGRAM_API}${token}/${method}`;
|
||||
}
|
||||
Reference in New Issue
Block a user