diff --git a/cmd/migrate_cf_data/convert_value.go b/cmd/migrate_cf_data/convert_value.go new file mode 100644 index 0000000..ccba2dd --- /dev/null +++ b/cmd/migrate_cf_data/convert_value.go @@ -0,0 +1,106 @@ +// One-shot rewrite of the table's `value` attribute from Binary (legacy +// shape) to String (current shape). Operator-elective; needed once after the +// runtime swap from MemberB to MemberS in internal/storage/dynamodb_kv.go. +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +func runConvertValueToString(args []string) error { + fs := flag.NewFlagSet("convert-value-to-string", flag.ExitOnError) + table := fs.String("table", "", "target DynamoDB table (required)") + dryRun := fs.Bool("dry-run", false, "log actions but do not write") + if err := fs.Parse(args); err != nil { + return err + } + if *table == "" { + return fmt.Errorf("--table is required") + } + + ctx := context.Background() + cfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + return fmt.Errorf("aws config: %w", err) + } + client := dynamodb.NewFromConfig(cfg) + + converted, alreadyString, skipped, failed := 0, 0, 0, 0 + pager := dynamodb.NewScanPaginator(client, &dynamodb.ScanInput{TableName: aws.String(*table)}) + for pager.HasMorePages() { + page, err := pager.NextPage(ctx) + if err != nil { + return fmt.Errorf("scan: %w", err) + } + for _, item := range page.Items { + pk, sk, ok := itemPKSK(item) + if !ok { + skipped++ + continue + } + valAttr, ok := item["value"] + if !ok { + skipped++ + continue + } + binAttr, isBinary := valAttr.(*types.AttributeValueMemberB) + if !isBinary { + alreadyString++ + continue + } + if *dryRun { + fmt.Printf(" DRY-RUN would convert pk=%s sk=%s len=%d\n", pk, sk, len(binAttr.Value)) + converted++ + continue + } + if err := putAsString(ctx, client, *table, pk, sk, binAttr.Value); err != nil { + fmt.Fprintf(os.Stderr, " put %s/%s: %v\n", pk, sk, err) + failed++ + continue + } + converted++ + } + } + + fmt.Printf("\nconvert-value-to-string report\n") + fmt.Printf(" converted (B → S): %d\n", converted) + fmt.Printf(" already String: %d\n", alreadyString) + fmt.Printf(" skipped (no pk/sk/value): %d\n", skipped) + fmt.Printf(" failed: %d\n", failed) + return nil +} + +func itemPKSK(item map[string]types.AttributeValue) (string, string, bool) { + pkAttr, ok := item["pk"].(*types.AttributeValueMemberS) + if !ok { + return "", "", false + } + skAttr, ok := item["sk"].(*types.AttributeValueMemberS) + if !ok { + return "", "", false + } + return pkAttr.Value, skAttr.Value, true +} + +func putAsString(ctx context.Context, client *dynamodb.Client, table, pk, sk string, val []byte) error { + _, err := client.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(table), + Item: map[string]types.AttributeValue{ + "pk": &types.AttributeValueMemberS{Value: pk}, + "sk": &types.AttributeValueMemberS{Value: sk}, + "value": &types.AttributeValueMemberS{Value: string(val)}, + "updatedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)}, + }, + }) + return err +} diff --git a/cmd/migrate_cf_data/main.go b/cmd/migrate_cf_data/main.go new file mode 100644 index 0000000..1bb7421 --- /dev/null +++ b/cmd/migrate_cf_data/main.go @@ -0,0 +1,223 @@ +// Command migrate_cf_data moves durable data from the legacy Cloudflare +// KV/D1 stack into the live AWS DynamoDB table. Operator-invoked only. +// +// Subcommands: +// +// inventory Read CF KV keys, apply the Phase 01 policy, print +// a classification report. No writes anywhere. +// +// kv-import Copy migrate-action KV keys into DynamoDB. +// Idempotent by default (attribute_not_exists guard). +// Flags: --table, --dry-run, --overwrite. +// +// trading-audit-dump Stream D1 `trading_trades` rows to a JSONL file. +// Audit-only; not an import input. +// Flags: --out (required). +// +// convert-value-to-string +// One-shot rewrite of the table's `value` attribute +// from Binary (legacy shape) to String (current +// shape). Idempotent — items already stored as +// String are skipped. +// Flags: --table, --dry-run. +// +// Required env: +// +// CLOUDFLARE_API_TOKEN — read-scoped token for KV + D1 +// CLOUDFLARE_ACCOUNT_ID — production CF account +// CF_KV_NAMESPACE_ID — production KV namespace +// CF_D1_DATABASE_ID — production D1 database (only needed for +// trading-audit-dump) +// AWS_REGION (or standard AWS SDK env) — only needed for kv-import +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/tiennm99/miti99bot/internal/migration" +) + +func main() { + if len(os.Args) < 2 { + usage() + os.Exit(2) + } + sub := os.Args[1] + args := os.Args[2:] + var err error + switch sub { + case "inventory": + err = runInventory(args) + case "kv-import": + err = runKVImport(args) + case "trading-audit-dump": + err = runTradingAuditDump(args) + case "convert-value-to-string": + err = runConvertValueToString(args) + case "-h", "--help", "help": + usage() + return + default: + usage() + os.Exit(2) + } + if err != nil { + fmt.Fprintln(os.Stderr, "error:", err) + os.Exit(1) + } +} + +func usage() { + fmt.Fprintln(os.Stderr, "usage: migrate_cf_data [flags]") +} + +func runInventory(args []string) error { + fs := flag.NewFlagSet("inventory", flag.ExitOnError) + if err := fs.Parse(args); err != nil { + return err + } + kv, err := newKVClient() + if err != nil { + return err + } + ctx := context.Background() + keys, err := kv.ListKeys(ctx) + if err != nil { + return fmt.Errorf("list keys: %w", err) + } + migrate := map[string]int{} + skip := map[string]int{} + for _, k := range keys { + d := migration.Classify(k) + if d.Action == migration.ActionMigrate { + migrate[migration.PrefixOf(k)]++ + } else { + skip[d.Reason]++ + } + } + fmt.Printf("Cloudflare KV namespace contains %d keys.\n\n", len(keys)) + fmt.Println("Migrate-action keys by prefix:") + for p, n := range migrate { + fmt.Printf(" %-30s %d\n", p, n) + } + fmt.Println("\nSkip-action keys by reason:") + for r, n := range skip { + fmt.Printf(" %-30s %d\n", r, n) + } + return nil +} + +func runKVImport(args []string) error { + fs := flag.NewFlagSet("kv-import", flag.ExitOnError) + table := fs.String("table", "", "target DynamoDB table (required)") + dryRun := fs.Bool("dry-run", false, "log actions but do not write") + overwrite := fs.Bool("overwrite", false, "drop attribute_not_exists guard") + if err := fs.Parse(args); err != nil { + return err + } + if *table == "" { + return fmt.Errorf("--table is required") + } + kv, err := newKVClient() + if err != nil { + return err + } + ctx := context.Background() + var writer *migration.DynamoDBWriter + if !*dryRun { + cfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + return fmt.Errorf("aws config: %w", err) + } + writer = migration.NewDynamoDBWriter(dynamodb.NewFromConfig(cfg), *table, *overwrite) + } + keys, err := kv.ListKeys(ctx) + if err != nil { + return fmt.Errorf("list keys: %w", err) + } + report := migration.NewReport() + for _, k := range keys { + d := migration.Classify(k) + if d.Action != migration.ActionMigrate { + report.AddSkippedPolicy(d.Reason) + continue + } + val, err := kv.GetValue(ctx, k) + if err != nil { + fmt.Fprintf(os.Stderr, " get %s: %v\n", k, err) + report.AddFailed(migration.PrefixOf(k)) + continue + } + if *dryRun { + fmt.Printf(" DRY-RUN would write pk=%s sk=%s len=%d\n", d.PK, d.SK, len(val)) + report.AddImported(migration.PrefixOf(k)) + continue + } + switch err := writer.Put(ctx, d.PK, d.SK, val); err { + case nil: + report.AddImported(migration.PrefixOf(k)) + case migration.ErrItemExists: + report.AddSkippedExisting(migration.PrefixOf(k)) + default: + fmt.Fprintf(os.Stderr, " put %s/%s: %v\n", d.PK, d.SK, err) + report.AddFailed(migration.PrefixOf(k)) + } + } + report.Format(os.Stdout) + return nil +} + +func runTradingAuditDump(args []string) error { + fs := flag.NewFlagSet("trading-audit-dump", flag.ExitOnError) + out := fs.String("out", "", "output JSONL file path (required)") + if err := fs.Parse(args); err != nil { + return err + } + if *out == "" { + return fmt.Errorf("--out is required") + } + d1, err := newD1Client() + if err != nil { + return err + } + rows, err := d1.Query(context.Background(), + "SELECT id, user_id, symbol, side, qty, price_vnd, ts FROM trading_trades ORDER BY id", nil) + if err != nil { + return fmt.Errorf("d1 query: %w", err) + } + f, err := os.Create(*out) + if err != nil { + return err + } + defer f.Close() + enc := json.NewEncoder(f) + for _, r := range rows { + if err := enc.Encode(r); err != nil { + return err + } + } + fmt.Printf("Wrote %d rows to %s\n", len(rows), *out) + return nil +} + +func newKVClient() (*migration.CloudflareKVClient, error) { + token, account, ns := os.Getenv("CLOUDFLARE_API_TOKEN"), os.Getenv("CLOUDFLARE_ACCOUNT_ID"), os.Getenv("CF_KV_NAMESPACE_ID") + if token == "" || account == "" || ns == "" { + return nil, fmt.Errorf("set CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CF_KV_NAMESPACE_ID") + } + return migration.NewCloudflareKVClient(account, ns, token), nil +} + +func newD1Client() (*migration.CloudflareD1Client, error) { + token, account, db := os.Getenv("CLOUDFLARE_API_TOKEN"), os.Getenv("CLOUDFLARE_ACCOUNT_ID"), os.Getenv("CF_D1_DATABASE_ID") + if token == "" || account == "" || db == "" { + return nil, fmt.Errorf("set CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CF_D1_DATABASE_ID") + } + return migration.NewCloudflareD1Client(account, db, token), nil +} diff --git a/internal/migration/cloudflare_d1_client.go b/internal/migration/cloudflare_d1_client.go new file mode 100644 index 0000000..296a5b8 --- /dev/null +++ b/internal/migration/cloudflare_d1_client.go @@ -0,0 +1,92 @@ +package migration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// CloudflareD1Client is a thin read-only REST client for the legacy +// Cloudflare D1 database. The migration only uses it for the optional +// trading_trades audit dump in Phase 03; runtime data flow never reads D1. +type CloudflareD1Client struct { + httpClient *http.Client + apiBase string + accountID string + databaseID string + apiToken string +} + +func NewCloudflareD1Client(accountID, databaseID, apiToken string) *CloudflareD1Client { + if accountID == "" || databaseID == "" || apiToken == "" { + panic("migration: CloudflareD1Client requires accountID, databaseID, apiToken") + } + return &CloudflareD1Client{ + httpClient: &http.Client{Timeout: 60 * time.Second}, + apiBase: "https://api.cloudflare.com/client/v4", + accountID: accountID, + databaseID: databaseID, + apiToken: apiToken, + } +} + +func (c *CloudflareD1Client) SetBaseURL(base string) { c.apiBase = base } + +// d1QueryEnvelope matches the D1 query response. Result is an array because +// D1 returns one entry per statement; we only ever send one. +type d1QueryEnvelope struct { + Result []struct { + Results []map[string]any `json:"results"` + Success bool `json:"success"` + } `json:"result"` + Success bool `json:"success"` + Errors []map[string]any `json:"errors"` +} + +// Query runs a single SQL statement and returns the row maps in result-set +// order. params is optional; pass nil for parameterless statements. +func (c *CloudflareD1Client) Query(ctx context.Context, sql string, params []any) ([]map[string]any, error) { + payload := map[string]any{"sql": sql} + if len(params) > 0 { + payload["params"] = params + } + body, err := json.Marshal(payload) + if err != nil { + return nil, err + } + endpoint := fmt.Sprintf("%s/accounts/%s/d1/database/%s/query", + c.apiBase, c.accountID, c.databaseID) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+c.apiToken) + req.Header.Set("Content-Type", "application/json") + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + raw, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("d1 query: status %d: %s", resp.StatusCode, string(raw)) + } + var env d1QueryEnvelope + if err := json.Unmarshal(raw, &env); err != nil { + return nil, fmt.Errorf("d1 query: decode: %w", err) + } + if !env.Success { + return nil, fmt.Errorf("d1 query: api error: %v", env.Errors) + } + if len(env.Result) == 0 { + return nil, nil + } + return env.Result[0].Results, nil +} diff --git a/internal/migration/cloudflare_d1_client_test.go b/internal/migration/cloudflare_d1_client_test.go new file mode 100644 index 0000000..96e90f7 --- /dev/null +++ b/internal/migration/cloudflare_d1_client_test.go @@ -0,0 +1,67 @@ +package migration + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestD1QueryHappyPath(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("method=%s want POST", r.Method) + } + if !strings.HasSuffix(r.URL.Path, "/d1/database/db123/query") { + t.Errorf("path=%s", r.URL.Path) + } + body, _ := io.ReadAll(r.Body) + var got map[string]any + _ = json.Unmarshal(body, &got) + if got["sql"] != "SELECT * FROM trading_trades" { + t.Errorf("sql=%v", got["sql"]) + } + _, _ = w.Write([]byte(`{ + "success": true, + "result": [{ + "results": [ + {"id": 1, "user_id": 100, "symbol": "FPT"}, + {"id": 2, "user_id": 100, "symbol": "TCB"} + ], + "success": true + }] + }`)) + })) + defer srv.Close() + + c := NewCloudflareD1Client("acct", "db123", "tok") + c.SetBaseURL(srv.URL) + + rows, err := c.Query(context.Background(), "SELECT * FROM trading_trades", nil) + if err != nil { + t.Fatalf("query: %v", err) + } + if len(rows) != 2 { + t.Fatalf("got %d rows", len(rows)) + } + if rows[0]["symbol"] != "FPT" || rows[1]["symbol"] != "TCB" { + t.Errorf("rows=%v", rows) + } +} + +func TestD1QueryApiError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"success": false, "errors": [{"code": 8000, "message": "bad sql"}]}`)) + })) + defer srv.Close() + + c := NewCloudflareD1Client("acct", "db", "tok") + c.SetBaseURL(srv.URL) + _, err := c.Query(context.Background(), "SELECT", nil) + if err == nil || !strings.Contains(err.Error(), "api error") { + t.Fatalf("got %v, want api error", err) + } +} diff --git a/internal/migration/cloudflare_kv_client.go b/internal/migration/cloudflare_kv_client.go new file mode 100644 index 0000000..e3df20b --- /dev/null +++ b/internal/migration/cloudflare_kv_client.go @@ -0,0 +1,135 @@ +package migration + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "time" +) + +// CloudflareKVClient is a thin read-only REST client for the legacy +// Cloudflare Workers KV namespace. List+Get are the only operations the +// migration needs; mutations stay out of scope. +type CloudflareKVClient struct { + httpClient *http.Client + apiBase string + accountID string + namespaceID string + apiToken string +} + +// NewCloudflareKVClient panics on empty required fields so misconfiguration +// surfaces at startup rather than after partial work. +func NewCloudflareKVClient(accountID, namespaceID, apiToken string) *CloudflareKVClient { + if accountID == "" || namespaceID == "" || apiToken == "" { + panic("migration: CloudflareKVClient requires accountID, namespaceID, apiToken") + } + return &CloudflareKVClient{ + httpClient: &http.Client{Timeout: 30 * time.Second}, + apiBase: "https://api.cloudflare.com/client/v4", + accountID: accountID, + namespaceID: namespaceID, + apiToken: apiToken, + } +} + +// SetBaseURL overrides the API base; used by tests with httptest.Server. +func (c *CloudflareKVClient) SetBaseURL(base string) { c.apiBase = base } + +// kvListEnvelope matches the Cloudflare REST list-keys response shape. +// See: https://developers.cloudflare.com/api/operations/workers-kv-namespace-list-a-namespace-s-keys +type kvListEnvelope struct { + Result []struct { + Name string `json:"name"` + } `json:"result"` + Success bool `json:"success"` + Errors []map[string]any `json:"errors"` + ResultInfo struct { + Cursor string `json:"cursor"` + Count int `json:"count"` + } `json:"result_info"` +} + +// ListKeys returns every key in the namespace. The Phase 01 inventory +// proved the namespace fits in well under one REST page (21 keys vs 1000 +// page limit), but pagination is still honored for safety. +func (c *CloudflareKVClient) ListKeys(ctx context.Context) ([]string, error) { + var out []string + cursor := "" + for { + page, next, err := c.listPage(ctx, cursor) + if err != nil { + return nil, err + } + out = append(out, page...) + if next == "" { + return out, nil + } + cursor = next + } +} + +func (c *CloudflareKVClient) listPage(ctx context.Context, cursor string) ([]string, string, error) { + endpoint := fmt.Sprintf("%s/accounts/%s/storage/kv/namespaces/%s/keys", + c.apiBase, c.accountID, c.namespaceID) + if cursor != "" { + endpoint += "?" + url.Values{"cursor": []string{cursor}}.Encode() + } + body, err := c.do(ctx, http.MethodGet, endpoint) + if err != nil { + return nil, "", err + } + var env kvListEnvelope + if err := json.Unmarshal(body, &env); err != nil { + return nil, "", fmt.Errorf("kv list: decode: %w", err) + } + if !env.Success { + return nil, "", fmt.Errorf("kv list: api error: %v", env.Errors) + } + names := make([]string, 0, len(env.Result)) + for _, r := range env.Result { + names = append(names, r.Name) + } + return names, env.ResultInfo.Cursor, nil +} + +// GetValue returns the raw value bytes for one KV key. CF returns 404 as +// errKeyNotFound so callers can decide whether a missing key is fatal. +func (c *CloudflareKVClient) GetValue(ctx context.Context, key string) ([]byte, error) { + endpoint := fmt.Sprintf("%s/accounts/%s/storage/kv/namespaces/%s/values/%s", + c.apiBase, c.accountID, c.namespaceID, url.PathEscape(key)) + return c.do(ctx, http.MethodGet, endpoint) +} + +// ErrKeyNotFound signals a 404 on a value GET. Returned wrapped so callers +// can use errors.Is. +var ErrKeyNotFound = errors.New("cloudflare kv: key not found") + +func (c *CloudflareKVClient) do(ctx context.Context, method, endpoint string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, method, endpoint, nil) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+c.apiToken) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode == http.StatusNotFound { + return nil, fmt.Errorf("%w: %s", ErrKeyNotFound, endpoint) + } + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("cloudflare api %s %s: status %d: %s", + method, endpoint, resp.StatusCode, string(body)) + } + return body, nil +} diff --git a/internal/migration/cloudflare_kv_client_test.go b/internal/migration/cloudflare_kv_client_test.go new file mode 100644 index 0000000..a39100c --- /dev/null +++ b/internal/migration/cloudflare_kv_client_test.go @@ -0,0 +1,107 @@ +package migration + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestListKeysSinglePage(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if got := r.Header.Get("Authorization"); got != "Bearer test-token" { + t.Errorf("auth header = %q", got) + } + if !strings.HasSuffix(r.URL.Path, "/storage/kv/namespaces/ns123/keys") { + t.Errorf("unexpected path %q", r.URL.Path) + } + _, _ = w.Write([]byte(`{ + "result": [{"name":"wordle:stats:1"},{"name":"trading:sym:FPT"}], + "success": true, + "result_info": {"count": 2, "cursor": ""} + }`)) + })) + defer srv.Close() + + c := NewCloudflareKVClient("acct", "ns123", "test-token") + c.SetBaseURL(srv.URL) + + keys, err := c.ListKeys(context.Background()) + if err != nil { + t.Fatalf("list: %v", err) + } + if len(keys) != 2 || keys[0] != "wordle:stats:1" || keys[1] != "trading:sym:FPT" { + t.Fatalf("got %v", keys) + } +} + +func TestListKeysPaginates(t *testing.T) { + page := 0 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + page++ + switch page { + case 1: + _, _ = w.Write([]byte(`{ + "result": [{"name":"a"}], + "success": true, + "result_info": {"cursor": "p2"} + }`)) + case 2: + if got := r.URL.Query().Get("cursor"); got != "p2" { + t.Errorf("cursor=%q want p2", got) + } + _, _ = w.Write([]byte(`{ + "result": [{"name":"b"}], + "success": true, + "result_info": {"cursor": ""} + }`)) + default: + t.Fatalf("too many pages") + } + })) + defer srv.Close() + + c := NewCloudflareKVClient("acct", "ns", "tok") + c.SetBaseURL(srv.URL) + keys, err := c.ListKeys(context.Background()) + if err != nil { + t.Fatalf("list: %v", err) + } + if len(keys) != 2 || keys[0] != "a" || keys[1] != "b" { + t.Fatalf("got %v", keys) + } +} + +func TestGetValueReturns404AsErrKeyNotFound(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer srv.Close() + + c := NewCloudflareKVClient("acct", "ns", "tok") + c.SetBaseURL(srv.URL) + _, err := c.GetValue(context.Background(), "misc:last_ping") + if !errors.Is(err, ErrKeyNotFound) { + t.Fatalf("got %v, want ErrKeyNotFound", err) + } +} + +func TestGetValueRawBytes(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"currency":{"VND":100},"meta":{"invested":0}}`)) + })) + defer srv.Close() + + c := NewCloudflareKVClient("acct", "ns", "tok") + c.SetBaseURL(srv.URL) + val, err := c.GetValue(context.Background(), "trading:user:42") + if err != nil { + t.Fatalf("get: %v", err) + } + want := `{"currency":{"VND":100},"meta":{"invested":0}}` + if string(val) != want { + t.Errorf("got %q want %q", string(val), want) + } +} diff --git a/internal/migration/dynamodb_writer.go b/internal/migration/dynamodb_writer.go new file mode 100644 index 0000000..751f1b7 --- /dev/null +++ b/internal/migration/dynamodb_writer.go @@ -0,0 +1,61 @@ +package migration + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +// DynamoDBWriter writes migrated KV records into the runtime DynamoDB table +// using the exact attribute shape internal/storage/dynamodb_kv.go expects: +// pk (S), sk (S), value (S), updatedAt (N). +// +// Idempotency: by default the writer attaches a ConditionExpression that +// rejects writes where the (pk, sk) pair already exists. The CLI exposes an +// --overwrite flag that drops the condition for explicit re-imports. +type DynamoDBWriter struct { + client *dynamodb.Client + table string + overwrite bool +} + +func NewDynamoDBWriter(client *dynamodb.Client, table string, overwrite bool) *DynamoDBWriter { + return &DynamoDBWriter{client: client, table: table, overwrite: overwrite} +} + +// ErrItemExists signals a guarded write skipped because (pk, sk) was already +// present. The caller increments the "skipped (already imported)" counter. +var ErrItemExists = errors.New("dynamodb: item exists") + +// Put writes one record. value bytes are stored as a DynamoDB String so the +// payload is human-readable in the AWS console; all current sources are JSON +// and therefore UTF-8 safe. +func (w *DynamoDBWriter) Put(ctx context.Context, pk, sk string, value []byte) error { + in := &dynamodb.PutItemInput{ + TableName: aws.String(w.table), + Item: map[string]types.AttributeValue{ + "pk": &types.AttributeValueMemberS{Value: pk}, + "sk": &types.AttributeValueMemberS{Value: sk}, + "value": &types.AttributeValueMemberS{Value: string(value)}, + "updatedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)}, + }, + } + if !w.overwrite { + in.ConditionExpression = aws.String("attribute_not_exists(pk)") + } + _, err := w.client.PutItem(ctx, in) + if err != nil { + var cf *types.ConditionalCheckFailedException + if errors.As(err, &cf) { + return ErrItemExists + } + return fmt.Errorf("dynamodb put %s/%s: %w", pk, sk, err) + } + return nil +} diff --git a/internal/migration/policy.go b/internal/migration/policy.go new file mode 100644 index 0000000..a0d3951 --- /dev/null +++ b/internal/migration/policy.go @@ -0,0 +1,99 @@ +// Package migration provides operator-run tooling that moves data from the +// legacy Cloudflare KV/D1 stack into the live AWS DynamoDB store. +// +// The package is intentionally small. It exposes: +// - Policy: classify a CF KV key as migrate/skip/archive and resolve its +// target DynamoDB (pk, sk). +// - CloudflareKVClient / CloudflareD1Client: thin REST readers. +// - DynamoDBWriter: idempotent writes against the runtime KV table shape. +// - Report: per-prefix counts for an import run. +// +// The runtime production code (cmd/server) never imports this package. +package migration + +import "strings" + +// Action is the per-source-key migration decision locked in Phase 01. +type Action string + +const ( + ActionMigrate Action = "migrate" + ActionSkip Action = "skip" +) + +// Decision is the resolved migration decision for one CF KV key. +type Decision struct { + Action Action + // PK and SK are populated only when Action == ActionMigrate. + PK string + SK string + // Reason explains a Skip (cache, retired, missing, etc.). + Reason string +} + +// kvRule is one entry in the static allowlist. Order matters: rules are +// matched top-down with HasPrefix so the most specific prefix wins when +// shorter prefixes would otherwise swallow a longer one. +type kvRule struct { + prefix string + module string // DynamoDB pk; empty when action != migrate + skip string // non-empty marks the rule as skip; value is the reason +} + +// kvRules is the locked Phase 01 inventory. Adding a new live CF prefix +// requires re-running the Phase 01 inventory and updating this list. +var kvRules = []kvRule{ + // Durable user data — migrate. + {prefix: "wordle:stats:", module: "wordle"}, + {prefix: "loldle:stats:", module: "loldle"}, + {prefix: "loldle:config:", module: "loldle"}, + {prefix: "twentyq:stats:", module: "twentyq"}, + {prefix: "lolschedule:subscribers", module: "lolschedule"}, + {prefix: "trading:user:", module: "trading"}, + + // Caches — skip. + {prefix: "trading:sym:", skip: "cache"}, + {prefix: "wordle:game:", skip: "ephemeral"}, + {prefix: "loldle:game:", skip: "ephemeral"}, + {prefix: "twentyq:game:", skip: "ephemeral"}, + {prefix: "lolschedule:matches:", skip: "cache"}, + + // Retired modules — operator chose not to archive (Phase 01, 2026-05-16). + {prefix: "doantu:", skip: "retired"}, + {prefix: "loldle-ability:", skip: "retired"}, + {prefix: "loldle-emoji:", skip: "retired"}, + {prefix: "loldle-quote:", skip: "retired"}, + {prefix: "loldle-splash:", skip: "retired"}, + {prefix: "semantle:", skip: "retired"}, +} + +// Classify returns the migration decision for a CF KV key. +// Unknown keys default to skip with reason "unknown" so new upstream prefixes +// are visible in the inventory report instead of being silently imported. +func Classify(cfKey string) Decision { + for _, r := range kvRules { + if !strings.HasPrefix(cfKey, r.prefix) { + continue + } + if r.skip != "" { + return Decision{Action: ActionSkip, Reason: r.skip} + } + // Migrate. Target sk is the CF key with the leading ":" stripped. + // Live runtime stores e.g. wordle/stats:, not wordle/wordle:stats:. + sk := strings.TrimPrefix(cfKey, r.module+":") + return Decision{Action: ActionMigrate, PK: r.module, SK: sk} + } + return Decision{Action: ActionSkip, Reason: "unknown"} +} + +// DurablePrefixes returns the migrate-action prefixes for use by inventory +// and reporting. Order matches kvRules. +func DurablePrefixes() []string { + out := make([]string, 0, len(kvRules)) + for _, r := range kvRules { + if r.skip == "" { + out = append(out, r.prefix) + } + } + return out +} diff --git a/internal/migration/policy_test.go b/internal/migration/policy_test.go new file mode 100644 index 0000000..46961dc --- /dev/null +++ b/internal/migration/policy_test.go @@ -0,0 +1,86 @@ +package migration + +import "testing" + +func TestClassify(t *testing.T) { + cases := []struct { + key string + wantAct Action + wantPK string + wantSK string + wantSkip string + }{ + // Durable migrate paths — these are the 6 prefixes Phase 01 locked. + {"wordle:stats:-1001760292100", ActionMigrate, "wordle", "stats:-1001760292100", ""}, + {"loldle:stats:1064111334", ActionMigrate, "loldle", "stats:1064111334", ""}, + {"loldle:config:-1001760292100", ActionMigrate, "loldle", "config:-1001760292100", ""}, + {"twentyq:stats:-1001760292100", ActionMigrate, "twentyq", "stats:-1001760292100", ""}, + {"lolschedule:subscribers", ActionMigrate, "lolschedule", "subscribers", ""}, + {"trading:user:1064111334", ActionMigrate, "trading", "user:1064111334", ""}, + + // Cache + ephemeral — skip. + {"trading:sym:FPT", ActionSkip, "", "", "cache"}, + {"wordle:game:abc", ActionSkip, "", "", "ephemeral"}, + {"lolschedule:matches:2026", ActionSkip, "", "", "cache"}, + + // Retired modules — skip. + {"doantu:stats:1064111334", ActionSkip, "", "", "retired"}, + {"semantle:stats:-1001760292100", ActionSkip, "", "", "retired"}, + {"loldle-emoji:stats:-1001760292100", ActionSkip, "", "", "retired"}, + + // Unknown prefix — skip with reason "unknown" so it surfaces in reports. + {"newmodule:foo", ActionSkip, "", "", "unknown"}, + } + + for _, c := range cases { + t.Run(c.key, func(t *testing.T) { + got := Classify(c.key) + if got.Action != c.wantAct { + t.Fatalf("action=%v want %v", got.Action, c.wantAct) + } + if got.PK != c.wantPK { + t.Errorf("pk=%q want %q", got.PK, c.wantPK) + } + if got.SK != c.wantSK { + t.Errorf("sk=%q want %q", got.SK, c.wantSK) + } + if got.Reason != c.wantSkip { + t.Errorf("reason=%q want %q", got.Reason, c.wantSkip) + } + }) + } +} + +func TestDurablePrefixes(t *testing.T) { + got := DurablePrefixes() + want := []string{ + "wordle:stats:", + "loldle:stats:", + "loldle:config:", + "twentyq:stats:", + "lolschedule:subscribers", + "trading:user:", + } + if len(got) != len(want) { + t.Fatalf("got %v, want %v", got, want) + } + for i := range want { + if got[i] != want[i] { + t.Errorf("idx %d: got %q want %q", i, got[i], want[i]) + } + } +} + +func TestPrefixOfLongestMatch(t *testing.T) { + // loldle:stats: and loldle:config: both share the loldle: super-prefix. + // Longest match wins so report buckets stay precise. + if got := PrefixOf("loldle:stats:1234"); got != "loldle:stats:" { + t.Errorf("loldle:stats:1234 → %q, want loldle:stats:", got) + } + if got := PrefixOf("loldle:config:5"); got != "loldle:config:" { + t.Errorf("loldle:config:5 → %q, want loldle:config:", got) + } + if got := PrefixOf("zzz:unknown"); got != "unknown" { + t.Errorf("unknown bucket: got %q", got) + } +} diff --git a/internal/migration/report.go b/internal/migration/report.go new file mode 100644 index 0000000..eb0e578 --- /dev/null +++ b/internal/migration/report.go @@ -0,0 +1,95 @@ +package migration + +import ( + "fmt" + "io" + "sort" +) + +// Report aggregates counts for one migration run. It is intentionally +// per-prefix rather than per-key so the runbook can compare totals to the +// Phase 01 inventory at a glance. +type Report struct { + // Imported is keys actually written to DynamoDB. + Imported map[string]int + // SkippedExisting is keys already present (idempotent rerun). + SkippedExisting map[string]int + // SkippedPolicy is keys rejected by the Phase 01 allowlist; keyed by + // reason (cache, retired, unknown, ephemeral). + SkippedPolicy map[string]int + // Failed is keys that errored mid-import. + Failed map[string]int +} + +func NewReport() *Report { + return &Report{ + Imported: map[string]int{}, + SkippedExisting: map[string]int{}, + SkippedPolicy: map[string]int{}, + Failed: map[string]int{}, + } +} + +func (r *Report) AddImported(prefix string) { r.Imported[prefix]++ } +func (r *Report) AddSkippedExisting(prefix string) { r.SkippedExisting[prefix]++ } +func (r *Report) AddSkippedPolicy(reason string) { r.SkippedPolicy[reason]++ } +func (r *Report) AddFailed(prefix string) { r.Failed[prefix]++ } + +// Format writes a human-readable summary. Stable ordering (alphabetical) so +// rerun diffs stay clean. +func (r *Report) Format(w io.Writer) { + fmt.Fprintln(w, "Migration report") + fmt.Fprintln(w, "================") + writeSection(w, "Imported", r.Imported) + writeSection(w, "Skipped (already present)", r.SkippedExisting) + writeSection(w, "Skipped (policy)", r.SkippedPolicy) + writeSection(w, "Failed", r.Failed) + fmt.Fprintf(w, "TOTAL imported=%d skipped_existing=%d skipped_policy=%d failed=%d\n", + sum(r.Imported), sum(r.SkippedExisting), sum(r.SkippedPolicy), sum(r.Failed)) +} + +func writeSection(w io.Writer, label string, m map[string]int) { + fmt.Fprintf(w, "\n%s:\n", label) + if len(m) == 0 { + fmt.Fprintln(w, " (none)") + return + } + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + fmt.Fprintf(w, " %-30s %d\n", k, m[k]) + } +} + +func sum(m map[string]int) int { + t := 0 + for _, v := range m { + t += v + } + return t +} + +// PrefixOf returns the longest known prefix from kvRules that matches key, +// or "unknown". Used by Report consumers to bucket counts. +func PrefixOf(key string) string { + best := "" + for _, r := range kvRules { + if len(r.prefix) > len(best) && hasPrefix(key, r.prefix) { + best = r.prefix + } + } + if best == "" { + return "unknown" + } + return best +} + +func hasPrefix(s, p string) bool { + if len(s) < len(p) { + return false + } + return s[:len(p)] == p +} diff --git a/internal/migration/report_test.go b/internal/migration/report_test.go new file mode 100644 index 0000000..c1981b4 --- /dev/null +++ b/internal/migration/report_test.go @@ -0,0 +1,52 @@ +package migration + +import ( + "bytes" + "strings" + "testing" +) + +func TestReportFormat(t *testing.T) { + r := NewReport() + r.AddImported("wordle:stats:") + r.AddImported("wordle:stats:") + r.AddImported("trading:user:") + r.AddSkippedExisting("loldle:stats:") + r.AddSkippedPolicy("cache") + r.AddSkippedPolicy("cache") + r.AddSkippedPolicy("retired") + r.AddFailed("twentyq:stats:") + + var buf bytes.Buffer + r.Format(&buf) + got := buf.String() + + // Spot-check structure and totals. + for _, want := range []string{ + "Imported:", + "wordle:stats: 2", + "trading:user: 1", + "Skipped (already present):", + "loldle:stats: 1", + "Skipped (policy):", + "cache 2", + "retired 1", + "Failed:", + "twentyq:stats: 1", + "TOTAL imported=3 skipped_existing=1 skipped_policy=3 failed=1", + } { + if !strings.Contains(got, want) { + t.Errorf("missing %q in output:\n%s", want, got) + } + } +} + +func TestReportEmptySection(t *testing.T) { + r := NewReport() + r.AddImported("wordle:stats:") + var buf bytes.Buffer + r.Format(&buf) + if !strings.Contains(buf.String(), "Failed:\n (none)") { + t.Errorf("empty section missing (none) marker:\n%s", buf.String()) + } +} diff --git a/internal/storage/dynamodb_kv.go b/internal/storage/dynamodb_kv.go index 5bdc418..8eacb91 100644 --- a/internal/storage/dynamodb_kv.go +++ b/internal/storage/dynamodb_kv.go @@ -64,11 +64,11 @@ func (s *DynamoDBKVStore) Get(ctx context.Context, key string) ([]byte, error) { if !ok { return nil, fmt.Errorf("dynamodb get %s/%s: missing %q attribute", s.moduleName, key, dynamoValueAttr) } - bin, ok := rawAttr.(*types.AttributeValueMemberB) + str, ok := rawAttr.(*types.AttributeValueMemberS) if !ok { return nil, fmt.Errorf("dynamodb get %s/%s: unexpected attribute type %T", s.moduleName, key, rawAttr) } - return bin.Value, nil + return []byte(str.Value), nil } // GetJSON decodes the value at key into dst. @@ -83,26 +83,20 @@ func (s *DynamoDBKVStore) GetJSON(ctx context.Context, key string, dst any) erro return nil } -// Put writes raw bytes at key, creating or overwriting. +// Put writes raw bytes at key, creating or overwriting. The value attribute +// is stored as a DynamoDB String so it is human-readable in the AWS console. +// Every current caller writes JSON, which is UTF-8 safe; non-UTF-8 callers +// must encode upstream (e.g. base64). func (s *DynamoDBKVStore) Put(ctx context.Context, key string, val []byte) error { if err := validateKey(key); err != nil { return err } - // DynamoDB rejects empty Binary values. Store a single zero byte sentinel - // transparently — Firestore allows zero-length []byte and callers may - // rely on that. The Get path treats both as []byte; downstream JSON - // callers will see []byte{0} where they put []byte{}, but no current - // caller relies on storing literal empty bytes (they use PutJSON, which - // always emits at least "null" = 4 bytes). - if len(val) == 0 { - val = []byte{0} - } _, err := s.client.PutItem(ctx, &dynamodb.PutItemInput{ TableName: aws.String(s.table), Item: map[string]types.AttributeValue{ dynamoPKAttr: &types.AttributeValueMemberS{Value: s.moduleName}, dynamoSKAttr: &types.AttributeValueMemberS{Value: key}, - dynamoValueAttr: &types.AttributeValueMemberB{Value: val}, + dynamoValueAttr: &types.AttributeValueMemberS{Value: string(val)}, dynamoUpdatedAtAttr: &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)}, }, }) @@ -181,4 +175,3 @@ func (s *DynamoDBKVStore) List(ctx context.Context, prefix string) ([]string, er } return keys, nil } -