Files
miti99bot/cmd/migrate_cf_data/main.go
T
tiennm99 a8ed67a0a3 refactor: audit-driven hygiene pass across modules and infra
Concurrency
- lolschedule: serialize subscriber Get→mutate→Put via state.subscribersMu;
  the single-slot list was previously losing writes under concurrent
  /lolschedule_subscribe.
- trading: PriceClient memoises its default *http.Client so /trade_stats
  reuses TLS connections across held tickers.

Observability
- server/log_middleware: defer the req log line and recover panics so a
  panicking cron handler still emits the structured req entry CloudWatch
  filters on for 5xx alerting.
- server/router (cron): inner recover with cron-name context captures the
  panicking job before the middleware's safety net does.
- telegram/webhook: rune-safe truncation in dispatch logs — Vietnamese,
  Korean, and emoji previews no longer ship as garbled bytes.
- lolschedule/api_client: same rune-safe fix for error-body log truncation.
- telegram/webhook: gate the post-recover WriteHeader(200) so a panicking
  handler that already touched w doesn't trigger superfluous-WriteHeader.

Correctness
- twentyq: clearGame error during solved-relaunch is logged instead of
  silently swallowed (was a permanent deadlock vector on KV failure).
- misc /mstats: KV read failure replies "Could not load stats. Try again
  later." to the user instead of returning into the dispatcher; matches the
  pattern other modules use.
- migrate_cf_data trading-audit-dump: surface f.Close error so a truncated
  JSONL never passes silently as a complete audit dump.

Operator ergonomics
- migrate_cf_data (all 4 subcommands): signal.NotifyContext for SIGINT /
  SIGTERM. Ctrl-C mid-Scan now propagates cleanly instead of leaving a
  half-converted DynamoDB table.
- ai/ratelimit: doc the Lambda-recycle memory bound to match keylock.Map
  so a future reviewer doesn't re-flag the unbounded map.

I/O-changing (user-approved)
- lolschedule daily push auto-prunes subscribers whose Telegram error
  matches a terminal marker (blocked / deactivated / chat gone). Transient
  errors keep the chat on the list. Subscribe message updated to mention
  the auto-cleanup.
- twentyq seed pool grown 50 → 178; repeat-collision threshold moves from
  ~9 plays to ~17 (birthday paradox).
- util /info flipped Public → Protected — chat/thread/sender IDs are no
  longer enumerable by every group member.
- cmd/server WriteTimeout 6min → 75s (cron 60s + 15s slack). No-op on
  Lambda; matters only for local non-Lambda runs.
- webhook + cron rejection paths drop response bodies (no fingerprintable
  text for internet scanners hitting the public Function URL). Status
  codes preserved for CloudWatch metrics; structured log lines carry the
  rejection reason for operator triage.

Tests added: TestTruncateRunes, TestRunDailyPush_PrunesDeadSubscribers,
TestIsTerminalSendError, TestInfo_DeniedToNonOwner,
TestInfo_DeniedToChannelMessageNoFrom, plus owner-allowed counterparts.
2026-05-16 13:35:00 +07:00

244 lines
7.2 KiB
Go

// Command migrate_cf_data moves durable data from the legacy Cloudflare
// KV/D1 stack into the live AWS DynamoDB table. Operator-invoked only.
//
// Subcommands:
//
// inventory Read CF KV keys, apply the Phase 01 policy, print
// a classification report. No writes anywhere.
//
// kv-import Copy migrate-action KV keys into DynamoDB.
// Idempotent by default (attribute_not_exists guard).
// Flags: --table, --dry-run, --overwrite.
//
// trading-audit-dump Stream D1 `trading_trades` rows to a JSONL file.
// Audit-only; not an import input.
// Flags: --out (required).
//
// convert-value-to-string
// One-shot rewrite of the table's `value` attribute
// from Binary (legacy shape) to String (current
// shape). Idempotent — items already stored as
// String are skipped.
// Flags: --table, --dry-run.
//
// Required env:
//
// CLOUDFLARE_API_TOKEN — read-scoped token for KV + D1
// CLOUDFLARE_ACCOUNT_ID — production CF account
// CF_KV_NAMESPACE_ID — production KV namespace
// CF_D1_DATABASE_ID — production D1 database (only needed for
// trading-audit-dump)
// AWS_REGION (or standard AWS SDK env) — only needed for kv-import
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/tiennm99/miti99bot/internal/migration"
)
// signalContext returns a context cancelled on Ctrl-C / SIGTERM. Used by every
// subcommand so a mid-scan abort leaves a clean error trail instead of a
// half-converted table the operator has to reason about.
func signalContext() (context.Context, context.CancelFunc) {
return signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
}
func main() {
if len(os.Args) < 2 {
usage()
os.Exit(2)
}
sub := os.Args[1]
args := os.Args[2:]
var err error
switch sub {
case "inventory":
err = runInventory(args)
case "kv-import":
err = runKVImport(args)
case "trading-audit-dump":
err = runTradingAuditDump(args)
case "convert-value-to-string":
err = runConvertValueToString(args)
case "-h", "--help", "help":
usage()
return
default:
usage()
os.Exit(2)
}
if err != nil {
fmt.Fprintln(os.Stderr, "error:", err)
os.Exit(1)
}
}
func usage() {
fmt.Fprintln(os.Stderr, "usage: migrate_cf_data <inventory|kv-import|trading-audit-dump|convert-value-to-string> [flags]")
}
func runInventory(args []string) error {
fs := flag.NewFlagSet("inventory", flag.ExitOnError)
if err := fs.Parse(args); err != nil {
return err
}
kv, err := newKVClient()
if err != nil {
return err
}
ctx, cancel := signalContext()
defer cancel()
keys, err := kv.ListKeys(ctx)
if err != nil {
return fmt.Errorf("list keys: %w", err)
}
migrate := map[string]int{}
skip := map[string]int{}
for _, k := range keys {
d := migration.Classify(k)
if d.Action == migration.ActionMigrate {
migrate[migration.PrefixOf(k)]++
} else {
skip[d.Reason]++
}
}
fmt.Printf("Cloudflare KV namespace contains %d keys.\n\n", len(keys))
fmt.Println("Migrate-action keys by prefix:")
for p, n := range migrate {
fmt.Printf(" %-30s %d\n", p, n)
}
fmt.Println("\nSkip-action keys by reason:")
for r, n := range skip {
fmt.Printf(" %-30s %d\n", r, n)
}
return nil
}
func runKVImport(args []string) error {
fs := flag.NewFlagSet("kv-import", flag.ExitOnError)
table := fs.String("table", "", "target DynamoDB table (required)")
dryRun := fs.Bool("dry-run", false, "log actions but do not write")
overwrite := fs.Bool("overwrite", false, "drop attribute_not_exists guard")
if err := fs.Parse(args); err != nil {
return err
}
if *table == "" {
return fmt.Errorf("--table is required")
}
kv, err := newKVClient()
if err != nil {
return err
}
ctx, cancel := signalContext()
defer cancel()
var writer *migration.DynamoDBWriter
if !*dryRun {
cfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
return fmt.Errorf("aws config: %w", err)
}
writer = migration.NewDynamoDBWriter(dynamodb.NewFromConfig(cfg), *table, *overwrite)
}
keys, err := kv.ListKeys(ctx)
if err != nil {
return fmt.Errorf("list keys: %w", err)
}
report := migration.NewReport()
for _, k := range keys {
d := migration.Classify(k)
if d.Action != migration.ActionMigrate {
report.AddSkippedPolicy(d.Reason)
continue
}
val, err := kv.GetValue(ctx, k)
if err != nil {
fmt.Fprintf(os.Stderr, " get %s: %v\n", k, err)
report.AddFailed(migration.PrefixOf(k))
continue
}
if *dryRun {
fmt.Printf(" DRY-RUN would write pk=%s sk=%s len=%d\n", d.PK, d.SK, len(val))
report.AddImported(migration.PrefixOf(k))
continue
}
switch err := writer.Put(ctx, d.PK, d.SK, val); err {
case nil:
report.AddImported(migration.PrefixOf(k))
case migration.ErrItemExists:
report.AddSkippedExisting(migration.PrefixOf(k))
default:
fmt.Fprintf(os.Stderr, " put %s/%s: %v\n", d.PK, d.SK, err)
report.AddFailed(migration.PrefixOf(k))
}
}
report.Format(os.Stdout)
return nil
}
func runTradingAuditDump(args []string) error {
fs := flag.NewFlagSet("trading-audit-dump", flag.ExitOnError)
out := fs.String("out", "", "output JSONL file path (required)")
if err := fs.Parse(args); err != nil {
return err
}
if *out == "" {
return fmt.Errorf("--out is required")
}
d1, err := newD1Client()
if err != nil {
return err
}
ctx, cancel := signalContext()
defer cancel()
rows, err := d1.Query(ctx,
"SELECT id, user_id, symbol, side, qty, price_vnd, ts FROM trading_trades ORDER BY id", nil)
if err != nil {
return fmt.Errorf("d1 query: %w", err)
}
f, err := os.Create(*out)
if err != nil {
return err
}
// Surface Close's error: an audit JSONL that fails to flush is silently
// truncated otherwise. Encode succeeding doesn't guarantee fsync — if
// the final Close hits ENOSPC the operator must see it (this file is
// evidence; a partial dump is worse than no dump).
enc := json.NewEncoder(f)
for _, r := range rows {
if err := enc.Encode(r); err != nil {
_ = f.Close()
return err
}
}
if err := f.Close(); err != nil {
return fmt.Errorf("close audit dump: %w", err)
}
fmt.Printf("Wrote %d rows to %s\n", len(rows), *out)
return nil
}
func newKVClient() (*migration.CloudflareKVClient, error) {
token, account, ns := os.Getenv("CLOUDFLARE_API_TOKEN"), os.Getenv("CLOUDFLARE_ACCOUNT_ID"), os.Getenv("CF_KV_NAMESPACE_ID")
if token == "" || account == "" || ns == "" {
return nil, fmt.Errorf("set CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CF_KV_NAMESPACE_ID")
}
return migration.NewCloudflareKVClient(account, ns, token), nil
}
func newD1Client() (*migration.CloudflareD1Client, error) {
token, account, db := os.Getenv("CLOUDFLARE_API_TOKEN"), os.Getenv("CLOUDFLARE_ACCOUNT_ID"), os.Getenv("CF_D1_DATABASE_ID")
if token == "" || account == "" || db == "" {
return nil, fmt.Errorf("set CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CF_D1_DATABASE_ID")
}
return migration.NewCloudflareD1Client(account, db, token), nil
}