mirror of
https://github.com/tiennm99/miti99bot.git
synced 2026-06-09 16:14:55 +00:00
feat(migration): cf→aws migration toolchain + DynamoDB value as String
Operator-run tooling that moves durable Cloudflare KV data into the live AWS DynamoDB table, plus a runtime swap of the `value` attribute from Binary to String so payloads are human-readable in the AWS console. - cmd/migrate_cf_data: subcommands inventory, kv-import (idempotent via attribute_not_exists), trading-audit-dump, convert-value-to-string - internal/migration: policy allowlist, CF KV+D1 REST clients, DynamoDB writer, report formatter with per-prefix counts + tests - internal/storage/dynamodb_kv.go: Put writes MemberS, Get reads MemberS; dropped empty-bytes sentinel (DynamoDB allows empty strings)
This commit is contained in:
@@ -0,0 +1,106 @@
|
||||
// One-shot rewrite of the table's `value` attribute from Binary (legacy
|
||||
// shape) to String (current shape). Operator-elective; needed once after the
|
||||
// runtime swap from MemberB to MemberS in internal/storage/dynamodb_kv.go.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
)
|
||||
|
||||
func runConvertValueToString(args []string) error {
|
||||
fs := flag.NewFlagSet("convert-value-to-string", flag.ExitOnError)
|
||||
table := fs.String("table", "", "target DynamoDB table (required)")
|
||||
dryRun := fs.Bool("dry-run", false, "log actions but do not write")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
if *table == "" {
|
||||
return fmt.Errorf("--table is required")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
cfg, err := awsconfig.LoadDefaultConfig(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("aws config: %w", err)
|
||||
}
|
||||
client := dynamodb.NewFromConfig(cfg)
|
||||
|
||||
converted, alreadyString, skipped, failed := 0, 0, 0, 0
|
||||
pager := dynamodb.NewScanPaginator(client, &dynamodb.ScanInput{TableName: aws.String(*table)})
|
||||
for pager.HasMorePages() {
|
||||
page, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("scan: %w", err)
|
||||
}
|
||||
for _, item := range page.Items {
|
||||
pk, sk, ok := itemPKSK(item)
|
||||
if !ok {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
valAttr, ok := item["value"]
|
||||
if !ok {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
binAttr, isBinary := valAttr.(*types.AttributeValueMemberB)
|
||||
if !isBinary {
|
||||
alreadyString++
|
||||
continue
|
||||
}
|
||||
if *dryRun {
|
||||
fmt.Printf(" DRY-RUN would convert pk=%s sk=%s len=%d\n", pk, sk, len(binAttr.Value))
|
||||
converted++
|
||||
continue
|
||||
}
|
||||
if err := putAsString(ctx, client, *table, pk, sk, binAttr.Value); err != nil {
|
||||
fmt.Fprintf(os.Stderr, " put %s/%s: %v\n", pk, sk, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
converted++
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("\nconvert-value-to-string report\n")
|
||||
fmt.Printf(" converted (B → S): %d\n", converted)
|
||||
fmt.Printf(" already String: %d\n", alreadyString)
|
||||
fmt.Printf(" skipped (no pk/sk/value): %d\n", skipped)
|
||||
fmt.Printf(" failed: %d\n", failed)
|
||||
return nil
|
||||
}
|
||||
|
||||
func itemPKSK(item map[string]types.AttributeValue) (string, string, bool) {
|
||||
pkAttr, ok := item["pk"].(*types.AttributeValueMemberS)
|
||||
if !ok {
|
||||
return "", "", false
|
||||
}
|
||||
skAttr, ok := item["sk"].(*types.AttributeValueMemberS)
|
||||
if !ok {
|
||||
return "", "", false
|
||||
}
|
||||
return pkAttr.Value, skAttr.Value, true
|
||||
}
|
||||
|
||||
func putAsString(ctx context.Context, client *dynamodb.Client, table, pk, sk string, val []byte) error {
|
||||
_, err := client.PutItem(ctx, &dynamodb.PutItemInput{
|
||||
TableName: aws.String(table),
|
||||
Item: map[string]types.AttributeValue{
|
||||
"pk": &types.AttributeValueMemberS{Value: pk},
|
||||
"sk": &types.AttributeValueMemberS{Value: sk},
|
||||
"value": &types.AttributeValueMemberS{Value: string(val)},
|
||||
"updatedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)},
|
||||
},
|
||||
})
|
||||
return err
|
||||
}
|
||||
@@ -0,0 +1,223 @@
|
||||
// Command migrate_cf_data moves durable data from the legacy Cloudflare
|
||||
// KV/D1 stack into the live AWS DynamoDB table. Operator-invoked only.
|
||||
//
|
||||
// Subcommands:
|
||||
//
|
||||
// inventory Read CF KV keys, apply the Phase 01 policy, print
|
||||
// a classification report. No writes anywhere.
|
||||
//
|
||||
// kv-import Copy migrate-action KV keys into DynamoDB.
|
||||
// Idempotent by default (attribute_not_exists guard).
|
||||
// Flags: --table, --dry-run, --overwrite.
|
||||
//
|
||||
// trading-audit-dump Stream D1 `trading_trades` rows to a JSONL file.
|
||||
// Audit-only; not an import input.
|
||||
// Flags: --out (required).
|
||||
//
|
||||
// convert-value-to-string
|
||||
// One-shot rewrite of the table's `value` attribute
|
||||
// from Binary (legacy shape) to String (current
|
||||
// shape). Idempotent — items already stored as
|
||||
// String are skipped.
|
||||
// Flags: --table, --dry-run.
|
||||
//
|
||||
// Required env:
|
||||
//
|
||||
// CLOUDFLARE_API_TOKEN — read-scoped token for KV + D1
|
||||
// CLOUDFLARE_ACCOUNT_ID — production CF account
|
||||
// CF_KV_NAMESPACE_ID — production KV namespace
|
||||
// CF_D1_DATABASE_ID — production D1 database (only needed for
|
||||
// trading-audit-dump)
|
||||
// AWS_REGION (or standard AWS SDK env) — only needed for kv-import
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
||||
"github.com/tiennm99/miti99bot/internal/migration"
|
||||
)
|
||||
|
||||
func main() {
|
||||
if len(os.Args) < 2 {
|
||||
usage()
|
||||
os.Exit(2)
|
||||
}
|
||||
sub := os.Args[1]
|
||||
args := os.Args[2:]
|
||||
var err error
|
||||
switch sub {
|
||||
case "inventory":
|
||||
err = runInventory(args)
|
||||
case "kv-import":
|
||||
err = runKVImport(args)
|
||||
case "trading-audit-dump":
|
||||
err = runTradingAuditDump(args)
|
||||
case "convert-value-to-string":
|
||||
err = runConvertValueToString(args)
|
||||
case "-h", "--help", "help":
|
||||
usage()
|
||||
return
|
||||
default:
|
||||
usage()
|
||||
os.Exit(2)
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error:", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func usage() {
|
||||
fmt.Fprintln(os.Stderr, "usage: migrate_cf_data <inventory|kv-import|trading-audit-dump|convert-value-to-string> [flags]")
|
||||
}
|
||||
|
||||
func runInventory(args []string) error {
|
||||
fs := flag.NewFlagSet("inventory", flag.ExitOnError)
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
kv, err := newKVClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx := context.Background()
|
||||
keys, err := kv.ListKeys(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list keys: %w", err)
|
||||
}
|
||||
migrate := map[string]int{}
|
||||
skip := map[string]int{}
|
||||
for _, k := range keys {
|
||||
d := migration.Classify(k)
|
||||
if d.Action == migration.ActionMigrate {
|
||||
migrate[migration.PrefixOf(k)]++
|
||||
} else {
|
||||
skip[d.Reason]++
|
||||
}
|
||||
}
|
||||
fmt.Printf("Cloudflare KV namespace contains %d keys.\n\n", len(keys))
|
||||
fmt.Println("Migrate-action keys by prefix:")
|
||||
for p, n := range migrate {
|
||||
fmt.Printf(" %-30s %d\n", p, n)
|
||||
}
|
||||
fmt.Println("\nSkip-action keys by reason:")
|
||||
for r, n := range skip {
|
||||
fmt.Printf(" %-30s %d\n", r, n)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func runKVImport(args []string) error {
|
||||
fs := flag.NewFlagSet("kv-import", flag.ExitOnError)
|
||||
table := fs.String("table", "", "target DynamoDB table (required)")
|
||||
dryRun := fs.Bool("dry-run", false, "log actions but do not write")
|
||||
overwrite := fs.Bool("overwrite", false, "drop attribute_not_exists guard")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
if *table == "" {
|
||||
return fmt.Errorf("--table is required")
|
||||
}
|
||||
kv, err := newKVClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx := context.Background()
|
||||
var writer *migration.DynamoDBWriter
|
||||
if !*dryRun {
|
||||
cfg, err := awsconfig.LoadDefaultConfig(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("aws config: %w", err)
|
||||
}
|
||||
writer = migration.NewDynamoDBWriter(dynamodb.NewFromConfig(cfg), *table, *overwrite)
|
||||
}
|
||||
keys, err := kv.ListKeys(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list keys: %w", err)
|
||||
}
|
||||
report := migration.NewReport()
|
||||
for _, k := range keys {
|
||||
d := migration.Classify(k)
|
||||
if d.Action != migration.ActionMigrate {
|
||||
report.AddSkippedPolicy(d.Reason)
|
||||
continue
|
||||
}
|
||||
val, err := kv.GetValue(ctx, k)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, " get %s: %v\n", k, err)
|
||||
report.AddFailed(migration.PrefixOf(k))
|
||||
continue
|
||||
}
|
||||
if *dryRun {
|
||||
fmt.Printf(" DRY-RUN would write pk=%s sk=%s len=%d\n", d.PK, d.SK, len(val))
|
||||
report.AddImported(migration.PrefixOf(k))
|
||||
continue
|
||||
}
|
||||
switch err := writer.Put(ctx, d.PK, d.SK, val); err {
|
||||
case nil:
|
||||
report.AddImported(migration.PrefixOf(k))
|
||||
case migration.ErrItemExists:
|
||||
report.AddSkippedExisting(migration.PrefixOf(k))
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, " put %s/%s: %v\n", d.PK, d.SK, err)
|
||||
report.AddFailed(migration.PrefixOf(k))
|
||||
}
|
||||
}
|
||||
report.Format(os.Stdout)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runTradingAuditDump(args []string) error {
|
||||
fs := flag.NewFlagSet("trading-audit-dump", flag.ExitOnError)
|
||||
out := fs.String("out", "", "output JSONL file path (required)")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
if *out == "" {
|
||||
return fmt.Errorf("--out is required")
|
||||
}
|
||||
d1, err := newD1Client()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := d1.Query(context.Background(),
|
||||
"SELECT id, user_id, symbol, side, qty, price_vnd, ts FROM trading_trades ORDER BY id", nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("d1 query: %w", err)
|
||||
}
|
||||
f, err := os.Create(*out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
enc := json.NewEncoder(f)
|
||||
for _, r := range rows {
|
||||
if err := enc.Encode(r); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
fmt.Printf("Wrote %d rows to %s\n", len(rows), *out)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newKVClient() (*migration.CloudflareKVClient, error) {
|
||||
token, account, ns := os.Getenv("CLOUDFLARE_API_TOKEN"), os.Getenv("CLOUDFLARE_ACCOUNT_ID"), os.Getenv("CF_KV_NAMESPACE_ID")
|
||||
if token == "" || account == "" || ns == "" {
|
||||
return nil, fmt.Errorf("set CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CF_KV_NAMESPACE_ID")
|
||||
}
|
||||
return migration.NewCloudflareKVClient(account, ns, token), nil
|
||||
}
|
||||
|
||||
func newD1Client() (*migration.CloudflareD1Client, error) {
|
||||
token, account, db := os.Getenv("CLOUDFLARE_API_TOKEN"), os.Getenv("CLOUDFLARE_ACCOUNT_ID"), os.Getenv("CF_D1_DATABASE_ID")
|
||||
if token == "" || account == "" || db == "" {
|
||||
return nil, fmt.Errorf("set CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CF_D1_DATABASE_ID")
|
||||
}
|
||||
return migration.NewCloudflareD1Client(account, db, token), nil
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CloudflareD1Client is a thin read-only REST client for the legacy
|
||||
// Cloudflare D1 database. The migration only uses it for the optional
|
||||
// trading_trades audit dump in Phase 03; runtime data flow never reads D1.
|
||||
type CloudflareD1Client struct {
|
||||
httpClient *http.Client
|
||||
apiBase string
|
||||
accountID string
|
||||
databaseID string
|
||||
apiToken string
|
||||
}
|
||||
|
||||
func NewCloudflareD1Client(accountID, databaseID, apiToken string) *CloudflareD1Client {
|
||||
if accountID == "" || databaseID == "" || apiToken == "" {
|
||||
panic("migration: CloudflareD1Client requires accountID, databaseID, apiToken")
|
||||
}
|
||||
return &CloudflareD1Client{
|
||||
httpClient: &http.Client{Timeout: 60 * time.Second},
|
||||
apiBase: "https://api.cloudflare.com/client/v4",
|
||||
accountID: accountID,
|
||||
databaseID: databaseID,
|
||||
apiToken: apiToken,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CloudflareD1Client) SetBaseURL(base string) { c.apiBase = base }
|
||||
|
||||
// d1QueryEnvelope matches the D1 query response. Result is an array because
|
||||
// D1 returns one entry per statement; we only ever send one.
|
||||
type d1QueryEnvelope struct {
|
||||
Result []struct {
|
||||
Results []map[string]any `json:"results"`
|
||||
Success bool `json:"success"`
|
||||
} `json:"result"`
|
||||
Success bool `json:"success"`
|
||||
Errors []map[string]any `json:"errors"`
|
||||
}
|
||||
|
||||
// Query runs a single SQL statement and returns the row maps in result-set
|
||||
// order. params is optional; pass nil for parameterless statements.
|
||||
func (c *CloudflareD1Client) Query(ctx context.Context, sql string, params []any) ([]map[string]any, error) {
|
||||
payload := map[string]any{"sql": sql}
|
||||
if len(params) > 0 {
|
||||
payload["params"] = params
|
||||
}
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
endpoint := fmt.Sprintf("%s/accounts/%s/d1/database/%s/query",
|
||||
c.apiBase, c.accountID, c.databaseID)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+c.apiToken)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
raw, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
return nil, fmt.Errorf("d1 query: status %d: %s", resp.StatusCode, string(raw))
|
||||
}
|
||||
var env d1QueryEnvelope
|
||||
if err := json.Unmarshal(raw, &env); err != nil {
|
||||
return nil, fmt.Errorf("d1 query: decode: %w", err)
|
||||
}
|
||||
if !env.Success {
|
||||
return nil, fmt.Errorf("d1 query: api error: %v", env.Errors)
|
||||
}
|
||||
if len(env.Result) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return env.Result[0].Results, nil
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestD1QueryHappyPath(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
t.Errorf("method=%s want POST", r.Method)
|
||||
}
|
||||
if !strings.HasSuffix(r.URL.Path, "/d1/database/db123/query") {
|
||||
t.Errorf("path=%s", r.URL.Path)
|
||||
}
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
var got map[string]any
|
||||
_ = json.Unmarshal(body, &got)
|
||||
if got["sql"] != "SELECT * FROM trading_trades" {
|
||||
t.Errorf("sql=%v", got["sql"])
|
||||
}
|
||||
_, _ = w.Write([]byte(`{
|
||||
"success": true,
|
||||
"result": [{
|
||||
"results": [
|
||||
{"id": 1, "user_id": 100, "symbol": "FPT"},
|
||||
{"id": 2, "user_id": 100, "symbol": "TCB"}
|
||||
],
|
||||
"success": true
|
||||
}]
|
||||
}`))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewCloudflareD1Client("acct", "db123", "tok")
|
||||
c.SetBaseURL(srv.URL)
|
||||
|
||||
rows, err := c.Query(context.Background(), "SELECT * FROM trading_trades", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("query: %v", err)
|
||||
}
|
||||
if len(rows) != 2 {
|
||||
t.Fatalf("got %d rows", len(rows))
|
||||
}
|
||||
if rows[0]["symbol"] != "FPT" || rows[1]["symbol"] != "TCB" {
|
||||
t.Errorf("rows=%v", rows)
|
||||
}
|
||||
}
|
||||
|
||||
func TestD1QueryApiError(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(`{"success": false, "errors": [{"code": 8000, "message": "bad sql"}]}`))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewCloudflareD1Client("acct", "db", "tok")
|
||||
c.SetBaseURL(srv.URL)
|
||||
_, err := c.Query(context.Background(), "SELECT", nil)
|
||||
if err == nil || !strings.Contains(err.Error(), "api error") {
|
||||
t.Fatalf("got %v, want api error", err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,135 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
// CloudflareKVClient is a thin read-only REST client for the legacy
|
||||
// Cloudflare Workers KV namespace. List+Get are the only operations the
|
||||
// migration needs; mutations stay out of scope.
|
||||
type CloudflareKVClient struct {
|
||||
httpClient *http.Client
|
||||
apiBase string
|
||||
accountID string
|
||||
namespaceID string
|
||||
apiToken string
|
||||
}
|
||||
|
||||
// NewCloudflareKVClient panics on empty required fields so misconfiguration
|
||||
// surfaces at startup rather than after partial work.
|
||||
func NewCloudflareKVClient(accountID, namespaceID, apiToken string) *CloudflareKVClient {
|
||||
if accountID == "" || namespaceID == "" || apiToken == "" {
|
||||
panic("migration: CloudflareKVClient requires accountID, namespaceID, apiToken")
|
||||
}
|
||||
return &CloudflareKVClient{
|
||||
httpClient: &http.Client{Timeout: 30 * time.Second},
|
||||
apiBase: "https://api.cloudflare.com/client/v4",
|
||||
accountID: accountID,
|
||||
namespaceID: namespaceID,
|
||||
apiToken: apiToken,
|
||||
}
|
||||
}
|
||||
|
||||
// SetBaseURL overrides the API base; used by tests with httptest.Server.
|
||||
func (c *CloudflareKVClient) SetBaseURL(base string) { c.apiBase = base }
|
||||
|
||||
// kvListEnvelope matches the Cloudflare REST list-keys response shape.
|
||||
// See: https://developers.cloudflare.com/api/operations/workers-kv-namespace-list-a-namespace-s-keys
|
||||
type kvListEnvelope struct {
|
||||
Result []struct {
|
||||
Name string `json:"name"`
|
||||
} `json:"result"`
|
||||
Success bool `json:"success"`
|
||||
Errors []map[string]any `json:"errors"`
|
||||
ResultInfo struct {
|
||||
Cursor string `json:"cursor"`
|
||||
Count int `json:"count"`
|
||||
} `json:"result_info"`
|
||||
}
|
||||
|
||||
// ListKeys returns every key in the namespace. The Phase 01 inventory
|
||||
// proved the namespace fits in well under one REST page (21 keys vs 1000
|
||||
// page limit), but pagination is still honored for safety.
|
||||
func (c *CloudflareKVClient) ListKeys(ctx context.Context) ([]string, error) {
|
||||
var out []string
|
||||
cursor := ""
|
||||
for {
|
||||
page, next, err := c.listPage(ctx, cursor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, page...)
|
||||
if next == "" {
|
||||
return out, nil
|
||||
}
|
||||
cursor = next
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CloudflareKVClient) listPage(ctx context.Context, cursor string) ([]string, string, error) {
|
||||
endpoint := fmt.Sprintf("%s/accounts/%s/storage/kv/namespaces/%s/keys",
|
||||
c.apiBase, c.accountID, c.namespaceID)
|
||||
if cursor != "" {
|
||||
endpoint += "?" + url.Values{"cursor": []string{cursor}}.Encode()
|
||||
}
|
||||
body, err := c.do(ctx, http.MethodGet, endpoint)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
var env kvListEnvelope
|
||||
if err := json.Unmarshal(body, &env); err != nil {
|
||||
return nil, "", fmt.Errorf("kv list: decode: %w", err)
|
||||
}
|
||||
if !env.Success {
|
||||
return nil, "", fmt.Errorf("kv list: api error: %v", env.Errors)
|
||||
}
|
||||
names := make([]string, 0, len(env.Result))
|
||||
for _, r := range env.Result {
|
||||
names = append(names, r.Name)
|
||||
}
|
||||
return names, env.ResultInfo.Cursor, nil
|
||||
}
|
||||
|
||||
// GetValue returns the raw value bytes for one KV key. CF returns 404 as
|
||||
// errKeyNotFound so callers can decide whether a missing key is fatal.
|
||||
func (c *CloudflareKVClient) GetValue(ctx context.Context, key string) ([]byte, error) {
|
||||
endpoint := fmt.Sprintf("%s/accounts/%s/storage/kv/namespaces/%s/values/%s",
|
||||
c.apiBase, c.accountID, c.namespaceID, url.PathEscape(key))
|
||||
return c.do(ctx, http.MethodGet, endpoint)
|
||||
}
|
||||
|
||||
// ErrKeyNotFound signals a 404 on a value GET. Returned wrapped so callers
|
||||
// can use errors.Is.
|
||||
var ErrKeyNotFound = errors.New("cloudflare kv: key not found")
|
||||
|
||||
func (c *CloudflareKVClient) do(ctx context.Context, method, endpoint string) ([]byte, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, method, endpoint, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Authorization", "Bearer "+c.apiToken)
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, fmt.Errorf("%w: %s", ErrKeyNotFound, endpoint)
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
return nil, fmt.Errorf("cloudflare api %s %s: status %d: %s",
|
||||
method, endpoint, resp.StatusCode, string(body))
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
@@ -0,0 +1,107 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestListKeysSinglePage(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if got := r.Header.Get("Authorization"); got != "Bearer test-token" {
|
||||
t.Errorf("auth header = %q", got)
|
||||
}
|
||||
if !strings.HasSuffix(r.URL.Path, "/storage/kv/namespaces/ns123/keys") {
|
||||
t.Errorf("unexpected path %q", r.URL.Path)
|
||||
}
|
||||
_, _ = w.Write([]byte(`{
|
||||
"result": [{"name":"wordle:stats:1"},{"name":"trading:sym:FPT"}],
|
||||
"success": true,
|
||||
"result_info": {"count": 2, "cursor": ""}
|
||||
}`))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewCloudflareKVClient("acct", "ns123", "test-token")
|
||||
c.SetBaseURL(srv.URL)
|
||||
|
||||
keys, err := c.ListKeys(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("list: %v", err)
|
||||
}
|
||||
if len(keys) != 2 || keys[0] != "wordle:stats:1" || keys[1] != "trading:sym:FPT" {
|
||||
t.Fatalf("got %v", keys)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListKeysPaginates(t *testing.T) {
|
||||
page := 0
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
page++
|
||||
switch page {
|
||||
case 1:
|
||||
_, _ = w.Write([]byte(`{
|
||||
"result": [{"name":"a"}],
|
||||
"success": true,
|
||||
"result_info": {"cursor": "p2"}
|
||||
}`))
|
||||
case 2:
|
||||
if got := r.URL.Query().Get("cursor"); got != "p2" {
|
||||
t.Errorf("cursor=%q want p2", got)
|
||||
}
|
||||
_, _ = w.Write([]byte(`{
|
||||
"result": [{"name":"b"}],
|
||||
"success": true,
|
||||
"result_info": {"cursor": ""}
|
||||
}`))
|
||||
default:
|
||||
t.Fatalf("too many pages")
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewCloudflareKVClient("acct", "ns", "tok")
|
||||
c.SetBaseURL(srv.URL)
|
||||
keys, err := c.ListKeys(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("list: %v", err)
|
||||
}
|
||||
if len(keys) != 2 || keys[0] != "a" || keys[1] != "b" {
|
||||
t.Fatalf("got %v", keys)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetValueReturns404AsErrKeyNotFound(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewCloudflareKVClient("acct", "ns", "tok")
|
||||
c.SetBaseURL(srv.URL)
|
||||
_, err := c.GetValue(context.Background(), "misc:last_ping")
|
||||
if !errors.Is(err, ErrKeyNotFound) {
|
||||
t.Fatalf("got %v, want ErrKeyNotFound", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetValueRawBytes(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(`{"currency":{"VND":100},"meta":{"invested":0}}`))
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := NewCloudflareKVClient("acct", "ns", "tok")
|
||||
c.SetBaseURL(srv.URL)
|
||||
val, err := c.GetValue(context.Background(), "trading:user:42")
|
||||
if err != nil {
|
||||
t.Fatalf("get: %v", err)
|
||||
}
|
||||
want := `{"currency":{"VND":100},"meta":{"invested":0}}`
|
||||
if string(val) != want {
|
||||
t.Errorf("got %q want %q", string(val), want)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||
)
|
||||
|
||||
// DynamoDBWriter writes migrated KV records into the runtime DynamoDB table
|
||||
// using the exact attribute shape internal/storage/dynamodb_kv.go expects:
|
||||
// pk (S), sk (S), value (S), updatedAt (N).
|
||||
//
|
||||
// Idempotency: by default the writer attaches a ConditionExpression that
|
||||
// rejects writes where the (pk, sk) pair already exists. The CLI exposes an
|
||||
// --overwrite flag that drops the condition for explicit re-imports.
|
||||
type DynamoDBWriter struct {
|
||||
client *dynamodb.Client
|
||||
table string
|
||||
overwrite bool
|
||||
}
|
||||
|
||||
func NewDynamoDBWriter(client *dynamodb.Client, table string, overwrite bool) *DynamoDBWriter {
|
||||
return &DynamoDBWriter{client: client, table: table, overwrite: overwrite}
|
||||
}
|
||||
|
||||
// ErrItemExists signals a guarded write skipped because (pk, sk) was already
|
||||
// present. The caller increments the "skipped (already imported)" counter.
|
||||
var ErrItemExists = errors.New("dynamodb: item exists")
|
||||
|
||||
// Put writes one record. value bytes are stored as a DynamoDB String so the
|
||||
// payload is human-readable in the AWS console; all current sources are JSON
|
||||
// and therefore UTF-8 safe.
|
||||
func (w *DynamoDBWriter) Put(ctx context.Context, pk, sk string, value []byte) error {
|
||||
in := &dynamodb.PutItemInput{
|
||||
TableName: aws.String(w.table),
|
||||
Item: map[string]types.AttributeValue{
|
||||
"pk": &types.AttributeValueMemberS{Value: pk},
|
||||
"sk": &types.AttributeValueMemberS{Value: sk},
|
||||
"value": &types.AttributeValueMemberS{Value: string(value)},
|
||||
"updatedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)},
|
||||
},
|
||||
}
|
||||
if !w.overwrite {
|
||||
in.ConditionExpression = aws.String("attribute_not_exists(pk)")
|
||||
}
|
||||
_, err := w.client.PutItem(ctx, in)
|
||||
if err != nil {
|
||||
var cf *types.ConditionalCheckFailedException
|
||||
if errors.As(err, &cf) {
|
||||
return ErrItemExists
|
||||
}
|
||||
return fmt.Errorf("dynamodb put %s/%s: %w", pk, sk, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
// Package migration provides operator-run tooling that moves data from the
|
||||
// legacy Cloudflare KV/D1 stack into the live AWS DynamoDB store.
|
||||
//
|
||||
// The package is intentionally small. It exposes:
|
||||
// - Policy: classify a CF KV key as migrate/skip/archive and resolve its
|
||||
// target DynamoDB (pk, sk).
|
||||
// - CloudflareKVClient / CloudflareD1Client: thin REST readers.
|
||||
// - DynamoDBWriter: idempotent writes against the runtime KV table shape.
|
||||
// - Report: per-prefix counts for an import run.
|
||||
//
|
||||
// The runtime production code (cmd/server) never imports this package.
|
||||
package migration
|
||||
|
||||
import "strings"
|
||||
|
||||
// Action is the per-source-key migration decision locked in Phase 01.
|
||||
type Action string
|
||||
|
||||
const (
|
||||
ActionMigrate Action = "migrate"
|
||||
ActionSkip Action = "skip"
|
||||
)
|
||||
|
||||
// Decision is the resolved migration decision for one CF KV key.
|
||||
type Decision struct {
|
||||
Action Action
|
||||
// PK and SK are populated only when Action == ActionMigrate.
|
||||
PK string
|
||||
SK string
|
||||
// Reason explains a Skip (cache, retired, missing, etc.).
|
||||
Reason string
|
||||
}
|
||||
|
||||
// kvRule is one entry in the static allowlist. Order matters: rules are
|
||||
// matched top-down with HasPrefix so the most specific prefix wins when
|
||||
// shorter prefixes would otherwise swallow a longer one.
|
||||
type kvRule struct {
|
||||
prefix string
|
||||
module string // DynamoDB pk; empty when action != migrate
|
||||
skip string // non-empty marks the rule as skip; value is the reason
|
||||
}
|
||||
|
||||
// kvRules is the locked Phase 01 inventory. Adding a new live CF prefix
|
||||
// requires re-running the Phase 01 inventory and updating this list.
|
||||
var kvRules = []kvRule{
|
||||
// Durable user data — migrate.
|
||||
{prefix: "wordle:stats:", module: "wordle"},
|
||||
{prefix: "loldle:stats:", module: "loldle"},
|
||||
{prefix: "loldle:config:", module: "loldle"},
|
||||
{prefix: "twentyq:stats:", module: "twentyq"},
|
||||
{prefix: "lolschedule:subscribers", module: "lolschedule"},
|
||||
{prefix: "trading:user:", module: "trading"},
|
||||
|
||||
// Caches — skip.
|
||||
{prefix: "trading:sym:", skip: "cache"},
|
||||
{prefix: "wordle:game:", skip: "ephemeral"},
|
||||
{prefix: "loldle:game:", skip: "ephemeral"},
|
||||
{prefix: "twentyq:game:", skip: "ephemeral"},
|
||||
{prefix: "lolschedule:matches:", skip: "cache"},
|
||||
|
||||
// Retired modules — operator chose not to archive (Phase 01, 2026-05-16).
|
||||
{prefix: "doantu:", skip: "retired"},
|
||||
{prefix: "loldle-ability:", skip: "retired"},
|
||||
{prefix: "loldle-emoji:", skip: "retired"},
|
||||
{prefix: "loldle-quote:", skip: "retired"},
|
||||
{prefix: "loldle-splash:", skip: "retired"},
|
||||
{prefix: "semantle:", skip: "retired"},
|
||||
}
|
||||
|
||||
// Classify returns the migration decision for a CF KV key.
|
||||
// Unknown keys default to skip with reason "unknown" so new upstream prefixes
|
||||
// are visible in the inventory report instead of being silently imported.
|
||||
func Classify(cfKey string) Decision {
|
||||
for _, r := range kvRules {
|
||||
if !strings.HasPrefix(cfKey, r.prefix) {
|
||||
continue
|
||||
}
|
||||
if r.skip != "" {
|
||||
return Decision{Action: ActionSkip, Reason: r.skip}
|
||||
}
|
||||
// Migrate. Target sk is the CF key with the leading "<module>:" stripped.
|
||||
// Live runtime stores e.g. wordle/stats:<subject>, not wordle/wordle:stats:<subject>.
|
||||
sk := strings.TrimPrefix(cfKey, r.module+":")
|
||||
return Decision{Action: ActionMigrate, PK: r.module, SK: sk}
|
||||
}
|
||||
return Decision{Action: ActionSkip, Reason: "unknown"}
|
||||
}
|
||||
|
||||
// DurablePrefixes returns the migrate-action prefixes for use by inventory
|
||||
// and reporting. Order matches kvRules.
|
||||
func DurablePrefixes() []string {
|
||||
out := make([]string, 0, len(kvRules))
|
||||
for _, r := range kvRules {
|
||||
if r.skip == "" {
|
||||
out = append(out, r.prefix)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
package migration
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestClassify(t *testing.T) {
|
||||
cases := []struct {
|
||||
key string
|
||||
wantAct Action
|
||||
wantPK string
|
||||
wantSK string
|
||||
wantSkip string
|
||||
}{
|
||||
// Durable migrate paths — these are the 6 prefixes Phase 01 locked.
|
||||
{"wordle:stats:-1001760292100", ActionMigrate, "wordle", "stats:-1001760292100", ""},
|
||||
{"loldle:stats:1064111334", ActionMigrate, "loldle", "stats:1064111334", ""},
|
||||
{"loldle:config:-1001760292100", ActionMigrate, "loldle", "config:-1001760292100", ""},
|
||||
{"twentyq:stats:-1001760292100", ActionMigrate, "twentyq", "stats:-1001760292100", ""},
|
||||
{"lolschedule:subscribers", ActionMigrate, "lolschedule", "subscribers", ""},
|
||||
{"trading:user:1064111334", ActionMigrate, "trading", "user:1064111334", ""},
|
||||
|
||||
// Cache + ephemeral — skip.
|
||||
{"trading:sym:FPT", ActionSkip, "", "", "cache"},
|
||||
{"wordle:game:abc", ActionSkip, "", "", "ephemeral"},
|
||||
{"lolschedule:matches:2026", ActionSkip, "", "", "cache"},
|
||||
|
||||
// Retired modules — skip.
|
||||
{"doantu:stats:1064111334", ActionSkip, "", "", "retired"},
|
||||
{"semantle:stats:-1001760292100", ActionSkip, "", "", "retired"},
|
||||
{"loldle-emoji:stats:-1001760292100", ActionSkip, "", "", "retired"},
|
||||
|
||||
// Unknown prefix — skip with reason "unknown" so it surfaces in reports.
|
||||
{"newmodule:foo", ActionSkip, "", "", "unknown"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.key, func(t *testing.T) {
|
||||
got := Classify(c.key)
|
||||
if got.Action != c.wantAct {
|
||||
t.Fatalf("action=%v want %v", got.Action, c.wantAct)
|
||||
}
|
||||
if got.PK != c.wantPK {
|
||||
t.Errorf("pk=%q want %q", got.PK, c.wantPK)
|
||||
}
|
||||
if got.SK != c.wantSK {
|
||||
t.Errorf("sk=%q want %q", got.SK, c.wantSK)
|
||||
}
|
||||
if got.Reason != c.wantSkip {
|
||||
t.Errorf("reason=%q want %q", got.Reason, c.wantSkip)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDurablePrefixes(t *testing.T) {
|
||||
got := DurablePrefixes()
|
||||
want := []string{
|
||||
"wordle:stats:",
|
||||
"loldle:stats:",
|
||||
"loldle:config:",
|
||||
"twentyq:stats:",
|
||||
"lolschedule:subscribers",
|
||||
"trading:user:",
|
||||
}
|
||||
if len(got) != len(want) {
|
||||
t.Fatalf("got %v, want %v", got, want)
|
||||
}
|
||||
for i := range want {
|
||||
if got[i] != want[i] {
|
||||
t.Errorf("idx %d: got %q want %q", i, got[i], want[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrefixOfLongestMatch(t *testing.T) {
|
||||
// loldle:stats: and loldle:config: both share the loldle: super-prefix.
|
||||
// Longest match wins so report buckets stay precise.
|
||||
if got := PrefixOf("loldle:stats:1234"); got != "loldle:stats:" {
|
||||
t.Errorf("loldle:stats:1234 → %q, want loldle:stats:", got)
|
||||
}
|
||||
if got := PrefixOf("loldle:config:5"); got != "loldle:config:" {
|
||||
t.Errorf("loldle:config:5 → %q, want loldle:config:", got)
|
||||
}
|
||||
if got := PrefixOf("zzz:unknown"); got != "unknown" {
|
||||
t.Errorf("unknown bucket: got %q", got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
)
|
||||
|
||||
// Report aggregates counts for one migration run. It is intentionally
|
||||
// per-prefix rather than per-key so the runbook can compare totals to the
|
||||
// Phase 01 inventory at a glance.
|
||||
type Report struct {
|
||||
// Imported is keys actually written to DynamoDB.
|
||||
Imported map[string]int
|
||||
// SkippedExisting is keys already present (idempotent rerun).
|
||||
SkippedExisting map[string]int
|
||||
// SkippedPolicy is keys rejected by the Phase 01 allowlist; keyed by
|
||||
// reason (cache, retired, unknown, ephemeral).
|
||||
SkippedPolicy map[string]int
|
||||
// Failed is keys that errored mid-import.
|
||||
Failed map[string]int
|
||||
}
|
||||
|
||||
func NewReport() *Report {
|
||||
return &Report{
|
||||
Imported: map[string]int{},
|
||||
SkippedExisting: map[string]int{},
|
||||
SkippedPolicy: map[string]int{},
|
||||
Failed: map[string]int{},
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Report) AddImported(prefix string) { r.Imported[prefix]++ }
|
||||
func (r *Report) AddSkippedExisting(prefix string) { r.SkippedExisting[prefix]++ }
|
||||
func (r *Report) AddSkippedPolicy(reason string) { r.SkippedPolicy[reason]++ }
|
||||
func (r *Report) AddFailed(prefix string) { r.Failed[prefix]++ }
|
||||
|
||||
// Format writes a human-readable summary. Stable ordering (alphabetical) so
|
||||
// rerun diffs stay clean.
|
||||
func (r *Report) Format(w io.Writer) {
|
||||
fmt.Fprintln(w, "Migration report")
|
||||
fmt.Fprintln(w, "================")
|
||||
writeSection(w, "Imported", r.Imported)
|
||||
writeSection(w, "Skipped (already present)", r.SkippedExisting)
|
||||
writeSection(w, "Skipped (policy)", r.SkippedPolicy)
|
||||
writeSection(w, "Failed", r.Failed)
|
||||
fmt.Fprintf(w, "TOTAL imported=%d skipped_existing=%d skipped_policy=%d failed=%d\n",
|
||||
sum(r.Imported), sum(r.SkippedExisting), sum(r.SkippedPolicy), sum(r.Failed))
|
||||
}
|
||||
|
||||
func writeSection(w io.Writer, label string, m map[string]int) {
|
||||
fmt.Fprintf(w, "\n%s:\n", label)
|
||||
if len(m) == 0 {
|
||||
fmt.Fprintln(w, " (none)")
|
||||
return
|
||||
}
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, k := range keys {
|
||||
fmt.Fprintf(w, " %-30s %d\n", k, m[k])
|
||||
}
|
||||
}
|
||||
|
||||
func sum(m map[string]int) int {
|
||||
t := 0
|
||||
for _, v := range m {
|
||||
t += v
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
// PrefixOf returns the longest known prefix from kvRules that matches key,
|
||||
// or "unknown". Used by Report consumers to bucket counts.
|
||||
func PrefixOf(key string) string {
|
||||
best := ""
|
||||
for _, r := range kvRules {
|
||||
if len(r.prefix) > len(best) && hasPrefix(key, r.prefix) {
|
||||
best = r.prefix
|
||||
}
|
||||
}
|
||||
if best == "" {
|
||||
return "unknown"
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
func hasPrefix(s, p string) bool {
|
||||
if len(s) < len(p) {
|
||||
return false
|
||||
}
|
||||
return s[:len(p)] == p
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestReportFormat(t *testing.T) {
|
||||
r := NewReport()
|
||||
r.AddImported("wordle:stats:")
|
||||
r.AddImported("wordle:stats:")
|
||||
r.AddImported("trading:user:")
|
||||
r.AddSkippedExisting("loldle:stats:")
|
||||
r.AddSkippedPolicy("cache")
|
||||
r.AddSkippedPolicy("cache")
|
||||
r.AddSkippedPolicy("retired")
|
||||
r.AddFailed("twentyq:stats:")
|
||||
|
||||
var buf bytes.Buffer
|
||||
r.Format(&buf)
|
||||
got := buf.String()
|
||||
|
||||
// Spot-check structure and totals.
|
||||
for _, want := range []string{
|
||||
"Imported:",
|
||||
"wordle:stats: 2",
|
||||
"trading:user: 1",
|
||||
"Skipped (already present):",
|
||||
"loldle:stats: 1",
|
||||
"Skipped (policy):",
|
||||
"cache 2",
|
||||
"retired 1",
|
||||
"Failed:",
|
||||
"twentyq:stats: 1",
|
||||
"TOTAL imported=3 skipped_existing=1 skipped_policy=3 failed=1",
|
||||
} {
|
||||
if !strings.Contains(got, want) {
|
||||
t.Errorf("missing %q in output:\n%s", want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReportEmptySection(t *testing.T) {
|
||||
r := NewReport()
|
||||
r.AddImported("wordle:stats:")
|
||||
var buf bytes.Buffer
|
||||
r.Format(&buf)
|
||||
if !strings.Contains(buf.String(), "Failed:\n (none)") {
|
||||
t.Errorf("empty section missing (none) marker:\n%s", buf.String())
|
||||
}
|
||||
}
|
||||
@@ -64,11 +64,11 @@ func (s *DynamoDBKVStore) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("dynamodb get %s/%s: missing %q attribute", s.moduleName, key, dynamoValueAttr)
|
||||
}
|
||||
bin, ok := rawAttr.(*types.AttributeValueMemberB)
|
||||
str, ok := rawAttr.(*types.AttributeValueMemberS)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("dynamodb get %s/%s: unexpected attribute type %T", s.moduleName, key, rawAttr)
|
||||
}
|
||||
return bin.Value, nil
|
||||
return []byte(str.Value), nil
|
||||
}
|
||||
|
||||
// GetJSON decodes the value at key into dst.
|
||||
@@ -83,26 +83,20 @@ func (s *DynamoDBKVStore) GetJSON(ctx context.Context, key string, dst any) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// Put writes raw bytes at key, creating or overwriting.
|
||||
// Put writes raw bytes at key, creating or overwriting. The value attribute
|
||||
// is stored as a DynamoDB String so it is human-readable in the AWS console.
|
||||
// Every current caller writes JSON, which is UTF-8 safe; non-UTF-8 callers
|
||||
// must encode upstream (e.g. base64).
|
||||
func (s *DynamoDBKVStore) Put(ctx context.Context, key string, val []byte) error {
|
||||
if err := validateKey(key); err != nil {
|
||||
return err
|
||||
}
|
||||
// DynamoDB rejects empty Binary values. Store a single zero byte sentinel
|
||||
// transparently — Firestore allows zero-length []byte and callers may
|
||||
// rely on that. The Get path treats both as []byte; downstream JSON
|
||||
// callers will see []byte{0} where they put []byte{}, but no current
|
||||
// caller relies on storing literal empty bytes (they use PutJSON, which
|
||||
// always emits at least "null" = 4 bytes).
|
||||
if len(val) == 0 {
|
||||
val = []byte{0}
|
||||
}
|
||||
_, err := s.client.PutItem(ctx, &dynamodb.PutItemInput{
|
||||
TableName: aws.String(s.table),
|
||||
Item: map[string]types.AttributeValue{
|
||||
dynamoPKAttr: &types.AttributeValueMemberS{Value: s.moduleName},
|
||||
dynamoSKAttr: &types.AttributeValueMemberS{Value: key},
|
||||
dynamoValueAttr: &types.AttributeValueMemberB{Value: val},
|
||||
dynamoValueAttr: &types.AttributeValueMemberS{Value: string(val)},
|
||||
dynamoUpdatedAtAttr: &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().UTC().UnixNano(), 10)},
|
||||
},
|
||||
})
|
||||
@@ -181,4 +175,3 @@ func (s *DynamoDBKVStore) List(ctx context.Context, prefix string) ([]string, er
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user