feat(migration): cf→aws migration toolchain + DynamoDB value as String

Operator-run tooling that moves durable Cloudflare KV data into the live
AWS DynamoDB table, plus a runtime swap of the `value` attribute from
Binary to String so payloads are human-readable in the AWS console.

- cmd/migrate_cf_data: subcommands inventory, kv-import (idempotent via
  attribute_not_exists), trading-audit-dump, convert-value-to-string
- internal/migration: policy allowlist, CF KV+D1 REST clients, DynamoDB
  writer, report formatter with per-prefix counts + tests
- internal/storage/dynamodb_kv.go: Put writes MemberS, Get reads MemberS;
  dropped empty-bytes sentinel (DynamoDB allows empty strings)
This commit is contained in:
2026-05-16 10:33:35 +07:00
parent 75e936096d
commit d67517e1fb
12 changed files with 1130 additions and 14 deletions
+106
View File
@@ -0,0 +1,106 @@
// One-shot rewrite of the table's `value` attribute from Binary (legacy
// shape) to String (current shape). Operator-elective; needed once after the
// runtime swap from MemberB to MemberS in internal/storage/dynamodb_kv.go.
package main
import (
"context"
"flag"
"fmt"
"os"
"strconv"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
func runConvertValueToString(args []string) error {
fs := flag.NewFlagSet("convert-value-to-string", flag.ExitOnError)
table := fs.String("table", "", "target DynamoDB table (required)")
dryRun := fs.Bool("dry-run", false, "log actions but do not write")
if err := fs.Parse(args); err != nil {
return err
}
if *table == "" {
return fmt.Errorf("--table is required")
}
ctx := context.Background()
cfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
return fmt.Errorf("aws config: %w", err)
}
client := dynamodb.NewFromConfig(cfg)
converted, alreadyString, skipped, failed := 0, 0, 0, 0
pager := dynamodb.NewScanPaginator(client, &dynamodb.ScanInput{TableName: aws.String(*table)})
for pager.HasMorePages() {
page, err := pager.NextPage(ctx)
if err != nil {
return fmt.Errorf("scan: %w", err)
}
for _, item := range page.Items {
pk, sk, ok := itemPKSK(item)
if !ok {
skipped++
continue
}
valAttr, ok := item["value"]
if !ok {
skipped++
continue
}
binAttr, isBinary := valAttr.(*types.AttributeValueMemberB)
if !isBinary {
alreadyString++
continue
}
if *dryRun {
fmt.Printf(" DRY-RUN would convert pk=%s sk=%s len=%d\n", pk, sk, len(binAttr.Value))
converted++
continue
}
if err := putAsString(ctx, client, *table, pk, sk, binAttr.Value); err != nil {
fmt.Fprintf(os.Stderr, " put %s/%s: %v\n", pk, sk, err)
failed++
continue
}
converted++
}
}
fmt.Printf("\nconvert-value-to-string report\n")
fmt.Printf(" converted (B → S): %d\n", converted)
fmt.Printf(" already String: %d\n", alreadyString)
fmt.Printf(" skipped (no pk/sk/value): %d\n", skipped)
fmt.Printf(" failed: %d\n", failed)
return nil
}
func itemPKSK(item map[string]types.AttributeValue) (string, string, bool) {
pkAttr, ok := item["pk"].(*types.AttributeValueMemberS)
if !ok {
return "", "", false
}
skAttr, ok := item["sk"].(*types.AttributeValueMemberS)
if !ok {
return "", "", false
}
return pkAttr.Value, skAttr.Value, true
}
func putAsString(ctx context.Context, client *dynamodb.Client, table, pk, sk string, val []byte) error {
_, err := client.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(table),
Item: map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
"sk": &types.AttributeValueMemberS{Value: sk},
"value": &types.AttributeValueMemberS{Value: string(val)},
"updatedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)},
},
})
return err
}
+223
View File
@@ -0,0 +1,223 @@
// Command migrate_cf_data moves durable data from the legacy Cloudflare
// KV/D1 stack into the live AWS DynamoDB table. Operator-invoked only.
//
// Subcommands:
//
// inventory Read CF KV keys, apply the Phase 01 policy, print
// a classification report. No writes anywhere.
//
// kv-import Copy migrate-action KV keys into DynamoDB.
// Idempotent by default (attribute_not_exists guard).
// Flags: --table, --dry-run, --overwrite.
//
// trading-audit-dump Stream D1 `trading_trades` rows to a JSONL file.
// Audit-only; not an import input.
// Flags: --out (required).
//
// convert-value-to-string
// One-shot rewrite of the table's `value` attribute
// from Binary (legacy shape) to String (current
// shape). Idempotent — items already stored as
// String are skipped.
// Flags: --table, --dry-run.
//
// Required env:
//
// CLOUDFLARE_API_TOKEN — read-scoped token for KV + D1
// CLOUDFLARE_ACCOUNT_ID — production CF account
// CF_KV_NAMESPACE_ID — production KV namespace
// CF_D1_DATABASE_ID — production D1 database (only needed for
// trading-audit-dump)
// AWS_REGION (or standard AWS SDK env) — only needed for kv-import
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/tiennm99/miti99bot/internal/migration"
)
func main() {
if len(os.Args) < 2 {
usage()
os.Exit(2)
}
sub := os.Args[1]
args := os.Args[2:]
var err error
switch sub {
case "inventory":
err = runInventory(args)
case "kv-import":
err = runKVImport(args)
case "trading-audit-dump":
err = runTradingAuditDump(args)
case "convert-value-to-string":
err = runConvertValueToString(args)
case "-h", "--help", "help":
usage()
return
default:
usage()
os.Exit(2)
}
if err != nil {
fmt.Fprintln(os.Stderr, "error:", err)
os.Exit(1)
}
}
func usage() {
fmt.Fprintln(os.Stderr, "usage: migrate_cf_data <inventory|kv-import|trading-audit-dump|convert-value-to-string> [flags]")
}
func runInventory(args []string) error {
fs := flag.NewFlagSet("inventory", flag.ExitOnError)
if err := fs.Parse(args); err != nil {
return err
}
kv, err := newKVClient()
if err != nil {
return err
}
ctx := context.Background()
keys, err := kv.ListKeys(ctx)
if err != nil {
return fmt.Errorf("list keys: %w", err)
}
migrate := map[string]int{}
skip := map[string]int{}
for _, k := range keys {
d := migration.Classify(k)
if d.Action == migration.ActionMigrate {
migrate[migration.PrefixOf(k)]++
} else {
skip[d.Reason]++
}
}
fmt.Printf("Cloudflare KV namespace contains %d keys.\n\n", len(keys))
fmt.Println("Migrate-action keys by prefix:")
for p, n := range migrate {
fmt.Printf(" %-30s %d\n", p, n)
}
fmt.Println("\nSkip-action keys by reason:")
for r, n := range skip {
fmt.Printf(" %-30s %d\n", r, n)
}
return nil
}
func runKVImport(args []string) error {
fs := flag.NewFlagSet("kv-import", flag.ExitOnError)
table := fs.String("table", "", "target DynamoDB table (required)")
dryRun := fs.Bool("dry-run", false, "log actions but do not write")
overwrite := fs.Bool("overwrite", false, "drop attribute_not_exists guard")
if err := fs.Parse(args); err != nil {
return err
}
if *table == "" {
return fmt.Errorf("--table is required")
}
kv, err := newKVClient()
if err != nil {
return err
}
ctx := context.Background()
var writer *migration.DynamoDBWriter
if !*dryRun {
cfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
return fmt.Errorf("aws config: %w", err)
}
writer = migration.NewDynamoDBWriter(dynamodb.NewFromConfig(cfg), *table, *overwrite)
}
keys, err := kv.ListKeys(ctx)
if err != nil {
return fmt.Errorf("list keys: %w", err)
}
report := migration.NewReport()
for _, k := range keys {
d := migration.Classify(k)
if d.Action != migration.ActionMigrate {
report.AddSkippedPolicy(d.Reason)
continue
}
val, err := kv.GetValue(ctx, k)
if err != nil {
fmt.Fprintf(os.Stderr, " get %s: %v\n", k, err)
report.AddFailed(migration.PrefixOf(k))
continue
}
if *dryRun {
fmt.Printf(" DRY-RUN would write pk=%s sk=%s len=%d\n", d.PK, d.SK, len(val))
report.AddImported(migration.PrefixOf(k))
continue
}
switch err := writer.Put(ctx, d.PK, d.SK, val); err {
case nil:
report.AddImported(migration.PrefixOf(k))
case migration.ErrItemExists:
report.AddSkippedExisting(migration.PrefixOf(k))
default:
fmt.Fprintf(os.Stderr, " put %s/%s: %v\n", d.PK, d.SK, err)
report.AddFailed(migration.PrefixOf(k))
}
}
report.Format(os.Stdout)
return nil
}
func runTradingAuditDump(args []string) error {
fs := flag.NewFlagSet("trading-audit-dump", flag.ExitOnError)
out := fs.String("out", "", "output JSONL file path (required)")
if err := fs.Parse(args); err != nil {
return err
}
if *out == "" {
return fmt.Errorf("--out is required")
}
d1, err := newD1Client()
if err != nil {
return err
}
rows, err := d1.Query(context.Background(),
"SELECT id, user_id, symbol, side, qty, price_vnd, ts FROM trading_trades ORDER BY id", nil)
if err != nil {
return fmt.Errorf("d1 query: %w", err)
}
f, err := os.Create(*out)
if err != nil {
return err
}
defer f.Close()
enc := json.NewEncoder(f)
for _, r := range rows {
if err := enc.Encode(r); err != nil {
return err
}
}
fmt.Printf("Wrote %d rows to %s\n", len(rows), *out)
return nil
}
func newKVClient() (*migration.CloudflareKVClient, error) {
token, account, ns := os.Getenv("CLOUDFLARE_API_TOKEN"), os.Getenv("CLOUDFLARE_ACCOUNT_ID"), os.Getenv("CF_KV_NAMESPACE_ID")
if token == "" || account == "" || ns == "" {
return nil, fmt.Errorf("set CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CF_KV_NAMESPACE_ID")
}
return migration.NewCloudflareKVClient(account, ns, token), nil
}
func newD1Client() (*migration.CloudflareD1Client, error) {
token, account, db := os.Getenv("CLOUDFLARE_API_TOKEN"), os.Getenv("CLOUDFLARE_ACCOUNT_ID"), os.Getenv("CF_D1_DATABASE_ID")
if token == "" || account == "" || db == "" {
return nil, fmt.Errorf("set CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CF_D1_DATABASE_ID")
}
return migration.NewCloudflareD1Client(account, db, token), nil
}
@@ -0,0 +1,92 @@
package migration
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// CloudflareD1Client is a thin read-only REST client for the legacy
// Cloudflare D1 database. The migration only uses it for the optional
// trading_trades audit dump in Phase 03; runtime data flow never reads D1.
type CloudflareD1Client struct {
httpClient *http.Client
apiBase string
accountID string
databaseID string
apiToken string
}
func NewCloudflareD1Client(accountID, databaseID, apiToken string) *CloudflareD1Client {
if accountID == "" || databaseID == "" || apiToken == "" {
panic("migration: CloudflareD1Client requires accountID, databaseID, apiToken")
}
return &CloudflareD1Client{
httpClient: &http.Client{Timeout: 60 * time.Second},
apiBase: "https://api.cloudflare.com/client/v4",
accountID: accountID,
databaseID: databaseID,
apiToken: apiToken,
}
}
func (c *CloudflareD1Client) SetBaseURL(base string) { c.apiBase = base }
// d1QueryEnvelope matches the D1 query response. Result is an array because
// D1 returns one entry per statement; we only ever send one.
type d1QueryEnvelope struct {
Result []struct {
Results []map[string]any `json:"results"`
Success bool `json:"success"`
} `json:"result"`
Success bool `json:"success"`
Errors []map[string]any `json:"errors"`
}
// Query runs a single SQL statement and returns the row maps in result-set
// order. params is optional; pass nil for parameterless statements.
func (c *CloudflareD1Client) Query(ctx context.Context, sql string, params []any) ([]map[string]any, error) {
payload := map[string]any{"sql": sql}
if len(params) > 0 {
payload["params"] = params
}
body, err := json.Marshal(payload)
if err != nil {
return nil, err
}
endpoint := fmt.Sprintf("%s/accounts/%s/d1/database/%s/query",
c.apiBase, c.accountID, c.databaseID)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiToken)
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
raw, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("d1 query: status %d: %s", resp.StatusCode, string(raw))
}
var env d1QueryEnvelope
if err := json.Unmarshal(raw, &env); err != nil {
return nil, fmt.Errorf("d1 query: decode: %w", err)
}
if !env.Success {
return nil, fmt.Errorf("d1 query: api error: %v", env.Errors)
}
if len(env.Result) == 0 {
return nil, nil
}
return env.Result[0].Results, nil
}
@@ -0,0 +1,67 @@
package migration
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestD1QueryHappyPath(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Errorf("method=%s want POST", r.Method)
}
if !strings.HasSuffix(r.URL.Path, "/d1/database/db123/query") {
t.Errorf("path=%s", r.URL.Path)
}
body, _ := io.ReadAll(r.Body)
var got map[string]any
_ = json.Unmarshal(body, &got)
if got["sql"] != "SELECT * FROM trading_trades" {
t.Errorf("sql=%v", got["sql"])
}
_, _ = w.Write([]byte(`{
"success": true,
"result": [{
"results": [
{"id": 1, "user_id": 100, "symbol": "FPT"},
{"id": 2, "user_id": 100, "symbol": "TCB"}
],
"success": true
}]
}`))
}))
defer srv.Close()
c := NewCloudflareD1Client("acct", "db123", "tok")
c.SetBaseURL(srv.URL)
rows, err := c.Query(context.Background(), "SELECT * FROM trading_trades", nil)
if err != nil {
t.Fatalf("query: %v", err)
}
if len(rows) != 2 {
t.Fatalf("got %d rows", len(rows))
}
if rows[0]["symbol"] != "FPT" || rows[1]["symbol"] != "TCB" {
t.Errorf("rows=%v", rows)
}
}
func TestD1QueryApiError(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"success": false, "errors": [{"code": 8000, "message": "bad sql"}]}`))
}))
defer srv.Close()
c := NewCloudflareD1Client("acct", "db", "tok")
c.SetBaseURL(srv.URL)
_, err := c.Query(context.Background(), "SELECT", nil)
if err == nil || !strings.Contains(err.Error(), "api error") {
t.Fatalf("got %v, want api error", err)
}
}
+135
View File
@@ -0,0 +1,135 @@
package migration
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"time"
)
// CloudflareKVClient is a thin read-only REST client for the legacy
// Cloudflare Workers KV namespace. List+Get are the only operations the
// migration needs; mutations stay out of scope.
type CloudflareKVClient struct {
httpClient *http.Client
apiBase string
accountID string
namespaceID string
apiToken string
}
// NewCloudflareKVClient panics on empty required fields so misconfiguration
// surfaces at startup rather than after partial work.
func NewCloudflareKVClient(accountID, namespaceID, apiToken string) *CloudflareKVClient {
if accountID == "" || namespaceID == "" || apiToken == "" {
panic("migration: CloudflareKVClient requires accountID, namespaceID, apiToken")
}
return &CloudflareKVClient{
httpClient: &http.Client{Timeout: 30 * time.Second},
apiBase: "https://api.cloudflare.com/client/v4",
accountID: accountID,
namespaceID: namespaceID,
apiToken: apiToken,
}
}
// SetBaseURL overrides the API base; used by tests with httptest.Server.
func (c *CloudflareKVClient) SetBaseURL(base string) { c.apiBase = base }
// kvListEnvelope matches the Cloudflare REST list-keys response shape.
// See: https://developers.cloudflare.com/api/operations/workers-kv-namespace-list-a-namespace-s-keys
type kvListEnvelope struct {
Result []struct {
Name string `json:"name"`
} `json:"result"`
Success bool `json:"success"`
Errors []map[string]any `json:"errors"`
ResultInfo struct {
Cursor string `json:"cursor"`
Count int `json:"count"`
} `json:"result_info"`
}
// ListKeys returns every key in the namespace. The Phase 01 inventory
// proved the namespace fits in well under one REST page (21 keys vs 1000
// page limit), but pagination is still honored for safety.
func (c *CloudflareKVClient) ListKeys(ctx context.Context) ([]string, error) {
var out []string
cursor := ""
for {
page, next, err := c.listPage(ctx, cursor)
if err != nil {
return nil, err
}
out = append(out, page...)
if next == "" {
return out, nil
}
cursor = next
}
}
func (c *CloudflareKVClient) listPage(ctx context.Context, cursor string) ([]string, string, error) {
endpoint := fmt.Sprintf("%s/accounts/%s/storage/kv/namespaces/%s/keys",
c.apiBase, c.accountID, c.namespaceID)
if cursor != "" {
endpoint += "?" + url.Values{"cursor": []string{cursor}}.Encode()
}
body, err := c.do(ctx, http.MethodGet, endpoint)
if err != nil {
return nil, "", err
}
var env kvListEnvelope
if err := json.Unmarshal(body, &env); err != nil {
return nil, "", fmt.Errorf("kv list: decode: %w", err)
}
if !env.Success {
return nil, "", fmt.Errorf("kv list: api error: %v", env.Errors)
}
names := make([]string, 0, len(env.Result))
for _, r := range env.Result {
names = append(names, r.Name)
}
return names, env.ResultInfo.Cursor, nil
}
// GetValue returns the raw value bytes for one KV key. CF returns 404 as
// errKeyNotFound so callers can decide whether a missing key is fatal.
func (c *CloudflareKVClient) GetValue(ctx context.Context, key string) ([]byte, error) {
endpoint := fmt.Sprintf("%s/accounts/%s/storage/kv/namespaces/%s/values/%s",
c.apiBase, c.accountID, c.namespaceID, url.PathEscape(key))
return c.do(ctx, http.MethodGet, endpoint)
}
// ErrKeyNotFound signals a 404 on a value GET. Returned wrapped so callers
// can use errors.Is.
var ErrKeyNotFound = errors.New("cloudflare kv: key not found")
func (c *CloudflareKVClient) do(ctx context.Context, method, endpoint string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, method, endpoint, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiToken)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("%w: %s", ErrKeyNotFound, endpoint)
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("cloudflare api %s %s: status %d: %s",
method, endpoint, resp.StatusCode, string(body))
}
return body, nil
}
@@ -0,0 +1,107 @@
package migration
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestListKeysSinglePage(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if got := r.Header.Get("Authorization"); got != "Bearer test-token" {
t.Errorf("auth header = %q", got)
}
if !strings.HasSuffix(r.URL.Path, "/storage/kv/namespaces/ns123/keys") {
t.Errorf("unexpected path %q", r.URL.Path)
}
_, _ = w.Write([]byte(`{
"result": [{"name":"wordle:stats:1"},{"name":"trading:sym:FPT"}],
"success": true,
"result_info": {"count": 2, "cursor": ""}
}`))
}))
defer srv.Close()
c := NewCloudflareKVClient("acct", "ns123", "test-token")
c.SetBaseURL(srv.URL)
keys, err := c.ListKeys(context.Background())
if err != nil {
t.Fatalf("list: %v", err)
}
if len(keys) != 2 || keys[0] != "wordle:stats:1" || keys[1] != "trading:sym:FPT" {
t.Fatalf("got %v", keys)
}
}
func TestListKeysPaginates(t *testing.T) {
page := 0
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
page++
switch page {
case 1:
_, _ = w.Write([]byte(`{
"result": [{"name":"a"}],
"success": true,
"result_info": {"cursor": "p2"}
}`))
case 2:
if got := r.URL.Query().Get("cursor"); got != "p2" {
t.Errorf("cursor=%q want p2", got)
}
_, _ = w.Write([]byte(`{
"result": [{"name":"b"}],
"success": true,
"result_info": {"cursor": ""}
}`))
default:
t.Fatalf("too many pages")
}
}))
defer srv.Close()
c := NewCloudflareKVClient("acct", "ns", "tok")
c.SetBaseURL(srv.URL)
keys, err := c.ListKeys(context.Background())
if err != nil {
t.Fatalf("list: %v", err)
}
if len(keys) != 2 || keys[0] != "a" || keys[1] != "b" {
t.Fatalf("got %v", keys)
}
}
func TestGetValueReturns404AsErrKeyNotFound(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer srv.Close()
c := NewCloudflareKVClient("acct", "ns", "tok")
c.SetBaseURL(srv.URL)
_, err := c.GetValue(context.Background(), "misc:last_ping")
if !errors.Is(err, ErrKeyNotFound) {
t.Fatalf("got %v, want ErrKeyNotFound", err)
}
}
func TestGetValueRawBytes(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"currency":{"VND":100},"meta":{"invested":0}}`))
}))
defer srv.Close()
c := NewCloudflareKVClient("acct", "ns", "tok")
c.SetBaseURL(srv.URL)
val, err := c.GetValue(context.Background(), "trading:user:42")
if err != nil {
t.Fatalf("get: %v", err)
}
want := `{"currency":{"VND":100},"meta":{"invested":0}}`
if string(val) != want {
t.Errorf("got %q want %q", string(val), want)
}
}
+61
View File
@@ -0,0 +1,61 @@
package migration
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// DynamoDBWriter writes migrated KV records into the runtime DynamoDB table
// using the exact attribute shape internal/storage/dynamodb_kv.go expects:
// pk (S), sk (S), value (S), updatedAt (N).
//
// Idempotency: by default the writer attaches a ConditionExpression that
// rejects writes where the (pk, sk) pair already exists. The CLI exposes an
// --overwrite flag that drops the condition for explicit re-imports.
type DynamoDBWriter struct {
client *dynamodb.Client
table string
overwrite bool
}
func NewDynamoDBWriter(client *dynamodb.Client, table string, overwrite bool) *DynamoDBWriter {
return &DynamoDBWriter{client: client, table: table, overwrite: overwrite}
}
// ErrItemExists signals a guarded write skipped because (pk, sk) was already
// present. The caller increments the "skipped (already imported)" counter.
var ErrItemExists = errors.New("dynamodb: item exists")
// Put writes one record. value bytes are stored as a DynamoDB String so the
// payload is human-readable in the AWS console; all current sources are JSON
// and therefore UTF-8 safe.
func (w *DynamoDBWriter) Put(ctx context.Context, pk, sk string, value []byte) error {
in := &dynamodb.PutItemInput{
TableName: aws.String(w.table),
Item: map[string]types.AttributeValue{
"pk": &types.AttributeValueMemberS{Value: pk},
"sk": &types.AttributeValueMemberS{Value: sk},
"value": &types.AttributeValueMemberS{Value: string(value)},
"updatedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)},
},
}
if !w.overwrite {
in.ConditionExpression = aws.String("attribute_not_exists(pk)")
}
_, err := w.client.PutItem(ctx, in)
if err != nil {
var cf *types.ConditionalCheckFailedException
if errors.As(err, &cf) {
return ErrItemExists
}
return fmt.Errorf("dynamodb put %s/%s: %w", pk, sk, err)
}
return nil
}
+99
View File
@@ -0,0 +1,99 @@
// Package migration provides operator-run tooling that moves data from the
// legacy Cloudflare KV/D1 stack into the live AWS DynamoDB store.
//
// The package is intentionally small. It exposes:
// - Policy: classify a CF KV key as migrate/skip/archive and resolve its
// target DynamoDB (pk, sk).
// - CloudflareKVClient / CloudflareD1Client: thin REST readers.
// - DynamoDBWriter: idempotent writes against the runtime KV table shape.
// - Report: per-prefix counts for an import run.
//
// The runtime production code (cmd/server) never imports this package.
package migration
import "strings"
// Action is the per-source-key migration decision locked in Phase 01.
type Action string
const (
ActionMigrate Action = "migrate"
ActionSkip Action = "skip"
)
// Decision is the resolved migration decision for one CF KV key.
type Decision struct {
Action Action
// PK and SK are populated only when Action == ActionMigrate.
PK string
SK string
// Reason explains a Skip (cache, retired, missing, etc.).
Reason string
}
// kvRule is one entry in the static allowlist. Order matters: rules are
// matched top-down with HasPrefix so the most specific prefix wins when
// shorter prefixes would otherwise swallow a longer one.
type kvRule struct {
prefix string
module string // DynamoDB pk; empty when action != migrate
skip string // non-empty marks the rule as skip; value is the reason
}
// kvRules is the locked Phase 01 inventory. Adding a new live CF prefix
// requires re-running the Phase 01 inventory and updating this list.
var kvRules = []kvRule{
// Durable user data — migrate.
{prefix: "wordle:stats:", module: "wordle"},
{prefix: "loldle:stats:", module: "loldle"},
{prefix: "loldle:config:", module: "loldle"},
{prefix: "twentyq:stats:", module: "twentyq"},
{prefix: "lolschedule:subscribers", module: "lolschedule"},
{prefix: "trading:user:", module: "trading"},
// Caches — skip.
{prefix: "trading:sym:", skip: "cache"},
{prefix: "wordle:game:", skip: "ephemeral"},
{prefix: "loldle:game:", skip: "ephemeral"},
{prefix: "twentyq:game:", skip: "ephemeral"},
{prefix: "lolschedule:matches:", skip: "cache"},
// Retired modules — operator chose not to archive (Phase 01, 2026-05-16).
{prefix: "doantu:", skip: "retired"},
{prefix: "loldle-ability:", skip: "retired"},
{prefix: "loldle-emoji:", skip: "retired"},
{prefix: "loldle-quote:", skip: "retired"},
{prefix: "loldle-splash:", skip: "retired"},
{prefix: "semantle:", skip: "retired"},
}
// Classify returns the migration decision for a CF KV key.
// Unknown keys default to skip with reason "unknown" so new upstream prefixes
// are visible in the inventory report instead of being silently imported.
func Classify(cfKey string) Decision {
for _, r := range kvRules {
if !strings.HasPrefix(cfKey, r.prefix) {
continue
}
if r.skip != "" {
return Decision{Action: ActionSkip, Reason: r.skip}
}
// Migrate. Target sk is the CF key with the leading "<module>:" stripped.
// Live runtime stores e.g. wordle/stats:<subject>, not wordle/wordle:stats:<subject>.
sk := strings.TrimPrefix(cfKey, r.module+":")
return Decision{Action: ActionMigrate, PK: r.module, SK: sk}
}
return Decision{Action: ActionSkip, Reason: "unknown"}
}
// DurablePrefixes returns the migrate-action prefixes for use by inventory
// and reporting. Order matches kvRules.
func DurablePrefixes() []string {
out := make([]string, 0, len(kvRules))
for _, r := range kvRules {
if r.skip == "" {
out = append(out, r.prefix)
}
}
return out
}
+86
View File
@@ -0,0 +1,86 @@
package migration
import "testing"
func TestClassify(t *testing.T) {
cases := []struct {
key string
wantAct Action
wantPK string
wantSK string
wantSkip string
}{
// Durable migrate paths — these are the 6 prefixes Phase 01 locked.
{"wordle:stats:-1001760292100", ActionMigrate, "wordle", "stats:-1001760292100", ""},
{"loldle:stats:1064111334", ActionMigrate, "loldle", "stats:1064111334", ""},
{"loldle:config:-1001760292100", ActionMigrate, "loldle", "config:-1001760292100", ""},
{"twentyq:stats:-1001760292100", ActionMigrate, "twentyq", "stats:-1001760292100", ""},
{"lolschedule:subscribers", ActionMigrate, "lolschedule", "subscribers", ""},
{"trading:user:1064111334", ActionMigrate, "trading", "user:1064111334", ""},
// Cache + ephemeral — skip.
{"trading:sym:FPT", ActionSkip, "", "", "cache"},
{"wordle:game:abc", ActionSkip, "", "", "ephemeral"},
{"lolschedule:matches:2026", ActionSkip, "", "", "cache"},
// Retired modules — skip.
{"doantu:stats:1064111334", ActionSkip, "", "", "retired"},
{"semantle:stats:-1001760292100", ActionSkip, "", "", "retired"},
{"loldle-emoji:stats:-1001760292100", ActionSkip, "", "", "retired"},
// Unknown prefix — skip with reason "unknown" so it surfaces in reports.
{"newmodule:foo", ActionSkip, "", "", "unknown"},
}
for _, c := range cases {
t.Run(c.key, func(t *testing.T) {
got := Classify(c.key)
if got.Action != c.wantAct {
t.Fatalf("action=%v want %v", got.Action, c.wantAct)
}
if got.PK != c.wantPK {
t.Errorf("pk=%q want %q", got.PK, c.wantPK)
}
if got.SK != c.wantSK {
t.Errorf("sk=%q want %q", got.SK, c.wantSK)
}
if got.Reason != c.wantSkip {
t.Errorf("reason=%q want %q", got.Reason, c.wantSkip)
}
})
}
}
func TestDurablePrefixes(t *testing.T) {
got := DurablePrefixes()
want := []string{
"wordle:stats:",
"loldle:stats:",
"loldle:config:",
"twentyq:stats:",
"lolschedule:subscribers",
"trading:user:",
}
if len(got) != len(want) {
t.Fatalf("got %v, want %v", got, want)
}
for i := range want {
if got[i] != want[i] {
t.Errorf("idx %d: got %q want %q", i, got[i], want[i])
}
}
}
func TestPrefixOfLongestMatch(t *testing.T) {
// loldle:stats: and loldle:config: both share the loldle: super-prefix.
// Longest match wins so report buckets stay precise.
if got := PrefixOf("loldle:stats:1234"); got != "loldle:stats:" {
t.Errorf("loldle:stats:1234 → %q, want loldle:stats:", got)
}
if got := PrefixOf("loldle:config:5"); got != "loldle:config:" {
t.Errorf("loldle:config:5 → %q, want loldle:config:", got)
}
if got := PrefixOf("zzz:unknown"); got != "unknown" {
t.Errorf("unknown bucket: got %q", got)
}
}
+95
View File
@@ -0,0 +1,95 @@
package migration
import (
"fmt"
"io"
"sort"
)
// Report aggregates counts for one migration run. It is intentionally
// per-prefix rather than per-key so the runbook can compare totals to the
// Phase 01 inventory at a glance.
type Report struct {
// Imported is keys actually written to DynamoDB.
Imported map[string]int
// SkippedExisting is keys already present (idempotent rerun).
SkippedExisting map[string]int
// SkippedPolicy is keys rejected by the Phase 01 allowlist; keyed by
// reason (cache, retired, unknown, ephemeral).
SkippedPolicy map[string]int
// Failed is keys that errored mid-import.
Failed map[string]int
}
func NewReport() *Report {
return &Report{
Imported: map[string]int{},
SkippedExisting: map[string]int{},
SkippedPolicy: map[string]int{},
Failed: map[string]int{},
}
}
func (r *Report) AddImported(prefix string) { r.Imported[prefix]++ }
func (r *Report) AddSkippedExisting(prefix string) { r.SkippedExisting[prefix]++ }
func (r *Report) AddSkippedPolicy(reason string) { r.SkippedPolicy[reason]++ }
func (r *Report) AddFailed(prefix string) { r.Failed[prefix]++ }
// Format writes a human-readable summary. Stable ordering (alphabetical) so
// rerun diffs stay clean.
func (r *Report) Format(w io.Writer) {
fmt.Fprintln(w, "Migration report")
fmt.Fprintln(w, "================")
writeSection(w, "Imported", r.Imported)
writeSection(w, "Skipped (already present)", r.SkippedExisting)
writeSection(w, "Skipped (policy)", r.SkippedPolicy)
writeSection(w, "Failed", r.Failed)
fmt.Fprintf(w, "TOTAL imported=%d skipped_existing=%d skipped_policy=%d failed=%d\n",
sum(r.Imported), sum(r.SkippedExisting), sum(r.SkippedPolicy), sum(r.Failed))
}
func writeSection(w io.Writer, label string, m map[string]int) {
fmt.Fprintf(w, "\n%s:\n", label)
if len(m) == 0 {
fmt.Fprintln(w, " (none)")
return
}
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
fmt.Fprintf(w, " %-30s %d\n", k, m[k])
}
}
func sum(m map[string]int) int {
t := 0
for _, v := range m {
t += v
}
return t
}
// PrefixOf returns the longest known prefix from kvRules that matches key,
// or "unknown". Used by Report consumers to bucket counts.
func PrefixOf(key string) string {
best := ""
for _, r := range kvRules {
if len(r.prefix) > len(best) && hasPrefix(key, r.prefix) {
best = r.prefix
}
}
if best == "" {
return "unknown"
}
return best
}
func hasPrefix(s, p string) bool {
if len(s) < len(p) {
return false
}
return s[:len(p)] == p
}
+52
View File
@@ -0,0 +1,52 @@
package migration
import (
"bytes"
"strings"
"testing"
)
func TestReportFormat(t *testing.T) {
r := NewReport()
r.AddImported("wordle:stats:")
r.AddImported("wordle:stats:")
r.AddImported("trading:user:")
r.AddSkippedExisting("loldle:stats:")
r.AddSkippedPolicy("cache")
r.AddSkippedPolicy("cache")
r.AddSkippedPolicy("retired")
r.AddFailed("twentyq:stats:")
var buf bytes.Buffer
r.Format(&buf)
got := buf.String()
// Spot-check structure and totals.
for _, want := range []string{
"Imported:",
"wordle:stats: 2",
"trading:user: 1",
"Skipped (already present):",
"loldle:stats: 1",
"Skipped (policy):",
"cache 2",
"retired 1",
"Failed:",
"twentyq:stats: 1",
"TOTAL imported=3 skipped_existing=1 skipped_policy=3 failed=1",
} {
if !strings.Contains(got, want) {
t.Errorf("missing %q in output:\n%s", want, got)
}
}
}
func TestReportEmptySection(t *testing.T) {
r := NewReport()
r.AddImported("wordle:stats:")
var buf bytes.Buffer
r.Format(&buf)
if !strings.Contains(buf.String(), "Failed:\n (none)") {
t.Errorf("empty section missing (none) marker:\n%s", buf.String())
}
}
+7 -14
View File
@@ -64,11 +64,11 @@ func (s *DynamoDBKVStore) Get(ctx context.Context, key string) ([]byte, error) {
if !ok {
return nil, fmt.Errorf("dynamodb get %s/%s: missing %q attribute", s.moduleName, key, dynamoValueAttr)
}
bin, ok := rawAttr.(*types.AttributeValueMemberB)
str, ok := rawAttr.(*types.AttributeValueMemberS)
if !ok {
return nil, fmt.Errorf("dynamodb get %s/%s: unexpected attribute type %T", s.moduleName, key, rawAttr)
}
return bin.Value, nil
return []byte(str.Value), nil
}
// GetJSON decodes the value at key into dst.
@@ -83,26 +83,20 @@ func (s *DynamoDBKVStore) GetJSON(ctx context.Context, key string, dst any) erro
return nil
}
// Put writes raw bytes at key, creating or overwriting.
// Put writes raw bytes at key, creating or overwriting. The value attribute
// is stored as a DynamoDB String so it is human-readable in the AWS console.
// Every current caller writes JSON, which is UTF-8 safe; non-UTF-8 callers
// must encode upstream (e.g. base64).
func (s *DynamoDBKVStore) Put(ctx context.Context, key string, val []byte) error {
if err := validateKey(key); err != nil {
return err
}
// DynamoDB rejects empty Binary values. Store a single zero byte sentinel
// transparently — Firestore allows zero-length []byte and callers may
// rely on that. The Get path treats both as []byte; downstream JSON
// callers will see []byte{0} where they put []byte{}, but no current
// caller relies on storing literal empty bytes (they use PutJSON, which
// always emits at least "null" = 4 bytes).
if len(val) == 0 {
val = []byte{0}
}
_, err := s.client.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(s.table),
Item: map[string]types.AttributeValue{
dynamoPKAttr: &types.AttributeValueMemberS{Value: s.moduleName},
dynamoSKAttr: &types.AttributeValueMemberS{Value: key},
dynamoValueAttr: &types.AttributeValueMemberB{Value: val},
dynamoValueAttr: &types.AttributeValueMemberS{Value: string(val)},
dynamoUpdatedAtAttr: &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)},
},
})
@@ -181,4 +175,3 @@ func (s *DynamoDBKVStore) List(ctx context.Context, prefix string) ([]string, er
}
return keys, nil
}