commit 1752ef05a640c9309bf12349b7e28d62085b79e7 Author: claude-timemachine Date: Tue Jun 2 18:52:25 2026 +0200 initial: Steam-Cloud-style per-user state sync skeleton HTTP API + on-disk storage + auth-service token verification + dev mode. 31 tests pass, vet clean. See DESIGN.md for the architecture and README.md for the operator surface. Pending: pg-backed per-user quota override, snapshot retention / blob GC, tarball-vs-manifest content cross-check, end-to-end deploy on john. diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml new file mode 100644 index 0000000..9939159 --- /dev/null +++ b/.gitea/workflows/ci.yaml @@ -0,0 +1,52 @@ +name: CI + +on: + push: + branches: [main, master] + tags: ["v*"] + pull_request: + branches: [main, master] + +jobs: + validate: + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + - name: Build + run: go build ./... + - name: Vet + run: go vet ./... + - name: Test + run: go test ./... + + docker: + runs-on: ubuntu-latest + timeout-minutes: 15 + needs: validate + if: github.event_name == 'push' + steps: + - uses: actions/checkout@v4 + + - name: Compute tags + id: meta + run: | + if [[ "$GITEA_REF" == refs/tags/v* ]]; then + echo "tag=${GITEA_REF#refs/tags/}" >> "$GITEA_OUTPUT" + else + echo "tag=latest" >> "$GITEA_OUTPUT" + fi + echo "sha=sha-$(echo "$GITEA_SHA" | cut -c1-7)" >> "$GITEA_OUTPUT" + + - name: Login to registry + run: echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login git.timemachine.center -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin + + - name: Build and push + run: | + IMG=git.timemachine.center/timemachine/${{ gitea.event.repository.name }} + docker build -t "$IMG:${{ steps.meta.outputs.tag }}" -t "$IMG:${{ steps.meta.outputs.sha }}" . + docker push "$IMG:${{ steps.meta.outputs.tag }}" + docker push "$IMG:${{ steps.meta.outputs.sha }}" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b140bf --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +cloud-svc +data/ +*.test +*.out +.idea/ +.vscode/ diff --git a/DESIGN.md b/DESIGN.md new file mode 100644 index 0000000..5ccc5fe --- /dev/null +++ b/DESIGN.md @@ -0,0 +1,266 @@ +# cloud-svc design + +Steam-Cloud-style per-user file sync for Minecraft clients. Player launches their client → state pulled. Player exits → state pushed. Across machines, conflicts resolved via per-file mtime + a pre-launch dialog when ambiguity remains. + +## Identity + +User identity = Discord ID (already issued by automc's account-card flow). Cloud token is a long-lived API key with scope `cloud:rw`, issued via `auth-service` and tied to the Discord ID. + +``` +client.py ──── Authorization: Bearer ────► cloud-svc + │ + ▼ POST /auth/verify-key + auth-service ──► returns { discord_id, scopes } +``` + +cloud-svc never sees Discord IDs directly from the client — it always asks auth-service. Token revocation is a single DB UPDATE on `api_keys.revoked`. + +## On-disk layout + +`~/automc/cloud-data/` on `john`, owned by `automc` user. Per-Discord-ID prefix: + +``` +cloud-data/ + / + manifest.json ← latest snapshot's per-file mtime + sha256 map + snapshots/ + .tar.zst ← full content tarball (zstd-compressed) + blobs/ ← content-addressed dedupe (sha256-prefixed dirs) + ab/ + abcdef0123... ← raw file content, referenced by manifest + cd/ + cdef0123... +``` + +**Why both tarball AND blob store?** +- Tarballs are the snapshot's immutable historical record (good for restore-to-version). +- Blob store is the on-line read path: per-file fetch on conflict resolution. Tarballs would force decompress-everything for one file. +- Same content in two places = waste. Resolved by **hard-linking** the blob from inside the tarball at write time (cloud-svc writes one canonical copy, tarball entries are hardlinks). Linux supports this in tar. + +If hardlinks turn out to be a pain across the rootless-podman boundary, fall back to blob-only and synthesize tarballs on demand for download. Defer the call. + +## Snapshot retention + +| Trigger | Action | +|---|---| +| `cloud_push` from client | New snapshot row. Tar written. Manifest updated. | +| Snapshot count > `RETAIN_LATEST` (default 30) | Oldest deleted. Hardlinked blobs that lose all refs are GC'd in a periodic job. | +| Per-user quota exceeded | Reject push with HTTP 413 + JSON `{error: "quota", used, limit}`. Client surfaces in UI. | + +Snapshot IDs are ULIDs (timestamp-sortable, unique without coordination). Tarball name = `01J<26 chars>.tar.zst`. + +## Per-file metadata + +`manifest.json` (per user, latest snapshot only): + +```json +{ + "snapshot_id": "01J9XQK4Z3...", + "created_at": "2026-06-02T18:30:00Z", + "files": { + "options.txt": { "sha256": "ab...", "size": 5234, "mtime": "2026-06-02T18:25:11Z" }, + "config/voicechat-client.json": { "sha256": "cd...", "size": 432, "mtime": "2026-06-01T22:14:03Z" }, + "journeymap/data/sp/world1/...": { "sha256": "ef...", "size": 88123,"mtime": "2026-06-02T18:29:42Z" } + } +} +``` + +Stored only for the **latest** snapshot. Older snapshots' manifests live alongside `.tar.zst`. Reading old manifest = open tar header. Manifest mtime is **the file's mtime at push time on the client** — not the upload time. This is the source of truth for conflict resolution. + +## HTTP API + +All endpoints under `/v1/`. JSON unless noted. Bearer auth required. + +| Method | Path | Purpose | +|---|---|---| +| `GET` | `/v1/manifest` | Return latest manifest for caller. 200 + JSON, or 204 if no snapshots yet. | +| `GET` | `/v1/blob/{sha256}` | Stream raw file content. 200 + bytes, or 404. Used during conflict resolution to fetch a specific file the client wants from the remote. | +| `POST` | `/v1/snapshot` | Upload new snapshot. Body = multipart `{manifest.json, snapshot.tar.zst}`. Server validates manifest matches tar contents (hash check), assigns snapshot_id, stores. Returns `{snapshot_id, snapshot_url}`. | +| `GET` | `/v1/snapshots` | List caller's snapshot IDs + timestamps (newest first). Used for restore UI / debugging. | +| `GET` | `/v1/snapshot/{id}` | Download a specific historical tarball. | +| `DELETE` | `/v1/snapshot/{id}` | Delete a specific snapshot (e.g., compromised data). Cannot delete the latest. | +| `GET` | `/v1/quota` | `{used_bytes, limit_bytes, snapshots, snapshot_limit}` | + +All authentication errors return `401 {error: "auth"}`. Quota errors return `413 {error: "quota", ...}`. Schema validation `400`. Unknown user (token revoked / Discord ID stripped) `403`. + +## Pull semantics (client side) + +``` +1. GET /v1/manifest → remote manifest +2. Walk local include-paths, compute (path, mtime, sha256) for each file +3. For each path in (remote ∪ local): + remote_only → DOWNLOAD via GET /v1/blob/{sha256}, write file, set mtime to remote.mtime + local_only → no-op (will push on exit) + both, sha matches → no-op + both, sha differs: + remote.mtime > local.mtime → AUTO_REMOTE: download, overwrite, set mtime + local.mtime > remote.mtime → AUTO_LOCAL: keep local (will push on exit) + |diff| ≤ 2s OR same mtime → CONFLICT: surface in dialog +``` + +The 2s threshold absorbs FS-level mtime rounding. + +## Push semantics (client side) + +``` +1. Walk local include-paths, build per-file (path, mtime, sha256) +2. GET /v1/manifest, build delta: + in local, not in remote → new + in local, sha differs from remote → changed + in remote, not in local → DELETED (manifest entry, no blob) +3. Build manifest.json for new snapshot: + {snapshot_id, created_at, files: {}} +4. Build tarball: only new + changed files (deleted entries omitted) +5. POST /v1/snapshot with manifest + tarball +6. On 200, save snapshot_id to local state file (used for next pull's known-base) +7. On 413 (quota), surface to user; offer pruning or scope reduction +``` + +## Conflict UI + +Pre-launch (after packwiz, before MC starts). When `pull` finds files in CONFLICT state, render a dialog. Cross-platform via stdlib `tkinter`: + +``` +┌──────────────────────────────────────────────────────────┐ +│ Cloud sync — manual resolve needed │ +├──────────────────────────────────────────────────────────┤ +│ Some files differ between this machine and your cloud: │ +│ │ +│ File Local Remote │ +│ options.txt 18:25 +0200 18:24 +0200 │ +│ ( ) keep local ( ) use remote │ +│ │ +│ config/voicechat-client.json 22:14 +0200 22:13 +0200 │ +│ ( ) keep local ( ) use remote │ +│ │ +│ [Use local for all] [Use remote for all] [Cancel launch] │ +│ │ +│ [Continue launch] │ +└──────────────────────────────────────────────────────────┘ +``` + +Defaults per-row to "use remote" (matches Steam's default — pull is destructive but consistent). User can override per file. + +**Cancel launch** = abort `client.py`, return non-zero. Player can fix manually then re-run. + +## What syncs (configurable per distribution) + +`cloud-scope.json` next to `client.py`: + +```json +{ + "include": [ + "options.txt", + "optionsof.txt", + "optionsshaders.txt", + "config/", + "journeymap/data/", + "screenshots/" + ], + "exclude": [ + "config/simple-mod-sync*", + "config/packwiz*", + "**/.tmp", + "**/cache/" + ], + "max_size_mb_per_file": 50, + "max_total_mb": 200 +} +``` + +Defaults are baked into client.py if the file is absent. JourneyMap (`journeymap/data/`) tracks per-server worlds, waypoints, settings — explicitly included. + +## Auth-service integration + +cloud-svc → auth-service contract (already exists in `auth-service/server.go`): + +``` +POST http://auth-service:9090/auth/verify-key + Authorization: # cloud-svc's own service token + Body: { "key": "" } + +200 { "user_id": "", "scopes": ["cloud:rw"] } +401 { "error": "invalid" } +403 { "error": "revoked" } +``` + +cloud-svc caches verified tokens in-memory for 60s to avoid hammering auth-service. Cache invalidated on 401 from client (forced refresh). + +## Encryption at rest + +**Optional, deferred.** Today: blobs are raw on `john`'s disk, owned by `automc` user, mode 0600. Filesystem permissions are the only barrier. Acceptable for pre-prod. + +For production: per-user symmetric key (derived from Discord ID + master secret) encrypts blobs with AES-GCM. Manifest stored with the encrypted blob mappings; client provides key per request. Adds significant complexity — defer until production scale. + +## Quadlet template + +```ini +[Unit] +Description=automc cloud-svc (player state sync) +After=automc-pg.service auth-service.service + +[Container] +ContainerName=cloud-svc +Image=git.timemachine.center/timemachine/cloud-svc:latest +Network=automc-net +NetworkAlias=cloud-svc +Environment=TZ={{ tz }} +EnvironmentFile=%h/automc/secrets/cloud-svc.env +PublishPort=127.0.0.1:9091:9091 +Volume=%h/automc/cloud-data:/data:Z + +[Service] +Restart=always + +[Install] +WantedBy=default.target +``` + +Bound to `127.0.0.1:9091` — players reach it only via SSH tunnel during dev. In real deployment, exposed via a reverse-proxy on the same hostname they use for the packwiz pack URL (e.g., `packs.timemachine.center/cloud/...`). + +## Out of scope (v1) + +- **Selective restore from old snapshot** — UI for "go back to last Tuesday's state". The API supports it (`GET /v1/snapshot/{id}` + manual extraction); the UI is deferred. +- **Multi-device live conflict** (player on PC + laptop simultaneously) — single-machine assumption, race documented. +- **Compression tuning** — zstd level 3 default. May tune up to 6 if disk pressure observed. +- **Anti-replay on tokens** — straight bearer auth. If a token leaks, revoke it. Not a primary attack surface. +- **Cross-server modpack-aware filtering** — cloud-scope.json is per-distribution, not per-server. Different servers might want different scopes; defer. + +## Stack + +- **Go 1.24+** (matches other automc services) +- `jackc/pgx/v5` for pg (if metadata stored there; alternative: sqlite per-deploy) +- `klauspost/compress/zstd` for tarball compression +- Standard `archive/tar` for tarball assembly +- No external HTTP framework — `net/http` + `gorilla/mux` style routing (or stdlib `http.ServeMux` Go 1.22+ pattern syntax) + +## File / module layout + +``` +cloud-svc/ + cmd/cloud-svc/main.go + internal/ + api/ ← HTTP handlers per endpoint + storage/ ← on-disk blob + tarball R/W, GC + manifest/ ← manifest.json types + (de)serialize + validate + auth/ ← auth-service client (token verify + cache) + quota/ ← per-user quota tracking + database/migrations/ ← if pg-backed metadata + Dockerfile + Makefile + .gitea/workflows/ci.yaml + docs/ARCHITECTURE.md +``` + +~1500 LOC Go total estimate. + +## Open questions + +1. **Metadata in automc-pg or sqlite-per-instance?** + - pg: shared with other services, easier ops, schema migrations same pipeline. + - sqlite: zero coupling, faster local I/O, harder to query externally. + - Recommendation: **pg** for consistency with the rest of automc. +2. **Quota source**: hardcoded per-user, or per-user-row in DB? + - DB row allows admin to bump limits per player. Use `users.cloud_quota_bytes` column (nullable, default to global). +3. **Reverse proxy in front of cloud-svc**: needed for player-facing URL (`packs.timemachine.center/cloud/...`)? + - Either nginx fronting cloud-svc, or expose cloud-svc directly with its own TLS via Caddy/something. Defer. diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0673324 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM golang:1.25-alpine AS builder +WORKDIR /build +COPY go.mod go.sum* ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/cloud-svc . + +FROM alpine:3.20 +RUN apk add --no-cache ca-certificates tzdata && adduser -D -u 1000 cloud +WORKDIR /app +COPY --from=builder /out/cloud-svc /app/cloud-svc +USER cloud +EXPOSE 9091 +ENTRYPOINT ["/app/cloud-svc"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9e4ac2e --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +.PHONY: build test vet run dev clean + +build: + go build -o cloud-svc . + +test: + go test ./... + +vet: + go vet ./... + +# Run in dev mode (accepts any bearer token) against ./data +dev: build + CLOUD_DEV_MODE=1 CLOUD_STORAGE_ROOT=./data CLOUD_LISTEN=127.0.0.1:9091 ./cloud-svc + +clean: + rm -f cloud-svc + rm -rf ./data diff --git a/README.md b/README.md new file mode 100644 index 0000000..23e7216 --- /dev/null +++ b/README.md @@ -0,0 +1,76 @@ +# cloud-svc + +Steam-Cloud-style per-user file sync for Minecraft clients. Pull on launch, push on exit, with per-file mtime conflict resolution + N-snapshot history. + +Part of the automc platform. See [`DESIGN.md`](DESIGN.md) for architecture. + +## Status + +**Skeleton.** Code, tests, build all working. Not yet deployed. + +- ✅ HTTP API (7 endpoints) +- ✅ On-disk blob + tarball + manifest storage +- ✅ Token verification via auth-service (with 60s cache) +- ✅ Dev mode (accept any bearer) +- ✅ Quota enforcement +- ⏳ Pg-backed per-user quota override (currently DefaultQuota only) +- ⏳ Snapshot retention / GC of unreferenced blobs +- ⏳ Tarball-vs-manifest content cross-check on push + +## Build + test + +```fish +make build +make test +make vet +``` + +31 tests, 0 fails. + +## Run locally (dev mode) + +```fish +make dev +``` + +Listens on `127.0.0.1:9091`. Accepts ANY non-empty bearer token; the token becomes the user ID. Files land under `./data/`. + +## Run against real auth-service + +```fish +CLOUD_LISTEN=127.0.0.1:9091 \ +CLOUD_STORAGE_ROOT=/var/lib/cloud-svc/data \ +CLOUD_AUTH_SERVICE_URL=http://auth-service:9090 \ +CLOUD_SERVICE_KEY=$(cat /etc/cloud-svc/service-key) \ +CLOUD_DEFAULT_QUOTA_MB=200 \ +./cloud-svc +``` + +## API quick reference + +| Method | Path | Purpose | +|---|---|---| +| `GET` | `/v1/manifest` | Latest manifest for caller | +| `GET` | `/v1/blob/{sha256}` | Raw file content | +| `POST` | `/v1/snapshot` | Multipart upload: `manifest` + `tarball` | +| `GET` | `/v1/snapshots` | List caller's snapshot history | +| `GET` | `/v1/snapshot/{id}` | Historical tarball | +| `DELETE` | `/v1/snapshot/{id}` | Remove (latest protected) | +| `GET` | `/v1/quota` | `{used_bytes, limit_bytes, snapshots}` | +| `GET` | `/healthz` | Unauthenticated liveness probe | + +All authenticated endpoints require `Authorization: Bearer ` with `cloud:rw` scope. + +See [`DESIGN.md`](DESIGN.md) for full details + on-disk layout + conflict semantics. + +## Configuration (env) + +| Var | Default | Purpose | +|---|---|---| +| `CLOUD_LISTEN` | `127.0.0.1:9091` | HTTP bind address | +| `CLOUD_STORAGE_ROOT` | `/data` | Root dir for blobs + snapshots | +| `CLOUD_AUTH_SERVICE_URL` | `http://auth-service:9090` | auth-service base URL | +| `CLOUD_SERVICE_KEY` | (required when not dev) | cloud-svc's own X-API-Key for calling auth-service | +| `CLOUD_AUTH_CACHE_TTL` | `60s` | Verified-token cache TTL | +| `CLOUD_DEFAULT_QUOTA_MB` | `200` | Per-user quota (MB) | +| `CLOUD_DEV_MODE` | unset | If `1`: accept any bearer; bypass auth-service | diff --git a/auth.go b/auth.go new file mode 100644 index 0000000..8f3d178 --- /dev/null +++ b/auth.go @@ -0,0 +1,160 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "sync" + "time" +) + +// AuthInfo is what cloud-svc cares about for a verified bearer token. +// User is the Discord ID; scopes lets us check that the key actually has +// "cloud:rw" rather than some narrower scope used elsewhere. +type AuthInfo struct { + User string + Scopes []string + expires time.Time +} + +func (a *AuthInfo) HasScope(s string) bool { + for _, x := range a.Scopes { + if x == s { + return true + } + } + return false +} + +// Verifier is the contract the HTTP handlers depend on. Implementations: +// +// - HTTPVerifier: real auth-service caller +// - DevVerifier: trusts any non-empty bearer, returns it as the user ID (tests + local dev) +type Verifier interface { + Verify(ctx context.Context, token string) (*AuthInfo, error) +} + +// HTTPVerifier calls auth-service's /auth/verify-key endpoint. Caches +// verified tokens in-memory for ttl to limit upstream pressure. +type HTTPVerifier struct { + endpoint string + serviceKey string // cloud-svc's own service token (X-API-Key when calling auth-service) + client *http.Client + ttl time.Duration + + mu sync.RWMutex + cache map[string]*AuthInfo +} + +func NewHTTPVerifier(endpoint, serviceKey string, ttl time.Duration) *HTTPVerifier { + return &HTTPVerifier{ + endpoint: strings.TrimRight(endpoint, "/"), + serviceKey: serviceKey, + client: &http.Client{Timeout: 5 * time.Second}, + ttl: ttl, + cache: map[string]*AuthInfo{}, + } +} + +func (v *HTTPVerifier) Verify(ctx context.Context, token string) (*AuthInfo, error) { + if token == "" { + return nil, ErrUnauthenticated + } + + // Cache hit? + v.mu.RLock() + cached, ok := v.cache[token] + v.mu.RUnlock() + if ok && time.Now().Before(cached.expires) { + return cached, nil + } + + body, _ := json.Marshal(map[string]string{"key": token}) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + v.endpoint+"/auth/verify-key", strings.NewReader(string(body))) + if err != nil { + return nil, fmt.Errorf("build verify request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if v.serviceKey != "" { + req.Header.Set("X-API-Key", v.serviceKey) + } + resp, err := v.client.Do(req) + if err != nil { + return nil, fmt.Errorf("auth-service unreachable: %w", err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + case http.StatusUnauthorized: + return nil, ErrUnauthenticated + case http.StatusForbidden: + return nil, ErrRevoked + default: + return nil, fmt.Errorf("auth-service returned %d", resp.StatusCode) + } + + var out struct { + UserID string `json:"user_id"` + Scopes []string `json:"scopes"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode auth response: %w", err) + } + if out.UserID == "" { + return nil, fmt.Errorf("auth-service returned empty user_id") + } + + info := &AuthInfo{ + User: out.UserID, + Scopes: out.Scopes, + expires: time.Now().Add(v.ttl), + } + v.mu.Lock() + v.cache[token] = info + v.mu.Unlock() + return info, nil +} + +// InvalidateToken drops a token from the cache (e.g. on 401 from a client +// re-trying with a token the cache says is still good). +func (v *HTTPVerifier) InvalidateToken(token string) { + v.mu.Lock() + delete(v.cache, token) + v.mu.Unlock() +} + +// DevVerifier accepts any non-empty token and returns it as the user ID with +// cloud:rw scope. Use for local dev / tests only; production wires +// HTTPVerifier. +type DevVerifier struct{} + +func (DevVerifier) Verify(_ context.Context, token string) (*AuthInfo, error) { + if token == "" { + return nil, ErrUnauthenticated + } + return &AuthInfo{ + User: token, + Scopes: []string{"cloud:rw"}, + }, nil +} + +var ( + ErrUnauthenticated = errors.New("unauthenticated") + ErrRevoked = errors.New("token revoked") +) + +// extractBearer pulls a token from `Authorization: Bearer ` headers. +// Returns "" if not present. +func extractBearer(r *http.Request) string { + h := r.Header.Get("Authorization") + const prefix = "Bearer " + if !strings.HasPrefix(h, prefix) { + return "" + } + return strings.TrimSpace(h[len(prefix):]) +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..f79fb08 --- /dev/null +++ b/config.go @@ -0,0 +1,58 @@ +package main + +import ( + "fmt" + "os" + "strconv" + "time" +) + +// Config wires env-driven settings. Mirrors other automc services' pattern: +// env vars all-caps with CLOUD_ prefix. No YAML/JSON config file — env only. +type Config struct { + Listen string // e.g. "127.0.0.1:9091" + StorageRoot string // e.g. "/data" inside container, mapped to ~/automc/cloud-data + AuthServiceURL string // e.g. "http://auth-service:9090" + ServiceKey string // cloud-svc's own X-API-Key when calling auth-service + AuthCacheTTL time.Duration // verified-token cache TTL + DefaultQuotaMB int64 // per-user default quota + DevMode bool // accept-any-bearer + skip auth-service +} + +func LoadConfig() (*Config, error) { + c := &Config{ + Listen: envOr("CLOUD_LISTEN", "127.0.0.1:9091"), + StorageRoot: envOr("CLOUD_STORAGE_ROOT", "/data"), + AuthServiceURL: envOr("CLOUD_AUTH_SERVICE_URL", "http://auth-service:9090"), + ServiceKey: os.Getenv("CLOUD_SERVICE_KEY"), + DevMode: os.Getenv("CLOUD_DEV_MODE") == "1", + } + + ttl, err := time.ParseDuration(envOr("CLOUD_AUTH_CACHE_TTL", "60s")) + if err != nil { + return nil, fmt.Errorf("CLOUD_AUTH_CACHE_TTL: %w", err) + } + c.AuthCacheTTL = ttl + + quotaStr := envOr("CLOUD_DEFAULT_QUOTA_MB", "200") + q, err := strconv.ParseInt(quotaStr, 10, 64) + if err != nil { + return nil, fmt.Errorf("CLOUD_DEFAULT_QUOTA_MB: %w", err) + } + if q <= 0 { + return nil, fmt.Errorf("CLOUD_DEFAULT_QUOTA_MB must be > 0, got %d", q) + } + c.DefaultQuotaMB = q + + if !c.DevMode && c.ServiceKey == "" { + return nil, fmt.Errorf("CLOUD_SERVICE_KEY required when CLOUD_DEV_MODE != 1") + } + return c, nil +} + +func envOr(k, def string) string { + if v := os.Getenv(k); v != "" { + return v + } + return def +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a9407f6 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.timemachine.center/timemachine/cloud-svc + +go 1.26.3 diff --git a/main.go b/main.go new file mode 100644 index 0000000..c62f198 --- /dev/null +++ b/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "errors" + "log/slog" + "net/http" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil))) + + cfg, err := LoadConfig() + if err != nil { + slog.Error("config", "err", err) + os.Exit(1) + } + + storage, err := NewStorage(cfg.StorageRoot) + if err != nil { + slog.Error("storage", "err", err) + os.Exit(1) + } + + var verifier Verifier + if cfg.DevMode { + slog.Warn("CLOUD_DEV_MODE=1 — accepting any bearer token, NOT for production") + verifier = DevVerifier{} + } else { + verifier = NewHTTPVerifier(cfg.AuthServiceURL, cfg.ServiceKey, cfg.AuthCacheTTL) + } + + quota := DefaultQuota(cfg.DefaultQuotaMB * 1024 * 1024) + srv := NewServer(storage, verifier, quota) + + httpSrv := &http.Server{ + Addr: cfg.Listen, + Handler: srv, + ReadHeaderTimeout: 5 * time.Second, + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + go func() { + slog.Info("cloud-svc listening", "addr", cfg.Listen, "storage", cfg.StorageRoot) + if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + slog.Error("listen", "err", err) + os.Exit(1) + } + }() + + <-ctx.Done() + slog.Info("shutdown requested, draining…") + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := httpSrv.Shutdown(shutdownCtx); err != nil { + slog.Warn("shutdown", "err", err) + } +} diff --git a/manifest.go b/manifest.go new file mode 100644 index 0000000..f987ffc --- /dev/null +++ b/manifest.go @@ -0,0 +1,104 @@ +package main + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "path" + "strings" + "time" +) + +// FileEntry is one row in the per-user manifest — the truth for what a file +// looked like at snapshot time. Mtime is the file's last-modified at the +// client's push moment, NOT the upload time on the server. Used as the +// authoritative timestamp during conflict resolution. +type FileEntry struct { + SHA256 string `json:"sha256"` + Size int64 `json:"size"` + Mtime time.Time `json:"mtime"` +} + +// Manifest is one snapshot's per-file map. Persisted as .manifest.json +// next to each tarball, plus a copy at user/manifest.json for the latest snapshot. +type Manifest struct { + SnapshotID string `json:"snapshot_id"` + CreatedAt time.Time `json:"created_at"` + Files map[string]FileEntry `json:"files"` +} + +// Validate checks structural invariants. Returns the first violation found. +func (m *Manifest) Validate() error { + if m.SnapshotID == "" { + return errors.New("snapshot_id is empty") + } + if m.CreatedAt.IsZero() { + return errors.New("created_at is zero") + } + for p, e := range m.Files { + if err := validatePath(p); err != nil { + return fmt.Errorf("file path %q: %w", p, err) + } + if len(e.SHA256) != 64 { + return fmt.Errorf("file %q: sha256 must be 64 hex chars, got %d", p, len(e.SHA256)) + } + if _, err := hex.DecodeString(e.SHA256); err != nil { + return fmt.Errorf("file %q: sha256 not hex: %w", p, err) + } + if e.Size < 0 { + return fmt.Errorf("file %q: negative size %d", p, e.Size) + } + if e.Mtime.IsZero() { + return fmt.Errorf("file %q: mtime is zero", p) + } + } + return nil +} + +// validatePath rejects empty paths, absolute paths, and any path component of +// ".." or "." — defense against tar entries trying to escape the user's +// scope. Forward-slash POSIX paths only. +func validatePath(p string) error { + if p == "" { + return errors.New("empty") + } + if strings.HasPrefix(p, "/") { + return errors.New("must be relative") + } + if strings.ContainsRune(p, '\\') { + return errors.New("backslash not allowed; use forward slashes") + } + cleaned := path.Clean(p) + if cleaned != p { + return fmt.Errorf("not in canonical form (got %q, clean %q)", p, cleaned) + } + for _, part := range strings.Split(p, "/") { + if part == "" || part == "." || part == ".." { + return fmt.Errorf("path component %q not allowed", part) + } + } + return nil +} + +// ReadManifest decodes a manifest from r. Convenience for handlers + tests. +func ReadManifest(r io.Reader) (*Manifest, error) { + var m Manifest + dec := json.NewDecoder(r) + dec.DisallowUnknownFields() + if err := dec.Decode(&m); err != nil { + return nil, fmt.Errorf("decode manifest: %w", err) + } + if err := m.Validate(); err != nil { + return nil, fmt.Errorf("invalid manifest: %w", err) + } + return &m, nil +} + +// HashBytes is a small helper for tests; returns hex sha256. +func HashBytes(b []byte) string { + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:]) +} diff --git a/manifest_test.go b/manifest_test.go new file mode 100644 index 0000000..eec8a83 --- /dev/null +++ b/manifest_test.go @@ -0,0 +1,109 @@ +package main + +import ( + "bytes" + "strings" + "testing" + "time" +) + +func goodFileEntry() FileEntry { + return FileEntry{ + SHA256: strings.Repeat("ab", 32), + Size: 100, + Mtime: time.Date(2026, 6, 2, 12, 0, 0, 0, time.UTC), + } +} + +func TestManifest_Validate_OK(t *testing.T) { + m := Manifest{ + SnapshotID: "01J9XQK4Z3ABCDEF", + CreatedAt: time.Now().UTC(), + Files: map[string]FileEntry{ + "options.txt": goodFileEntry(), + "config/voicechat-client.json": goodFileEntry(), + "journeymap/data/sp/world/m.bin": goodFileEntry(), + }, + } + if err := m.Validate(); err != nil { + t.Fatalf("expected nil, got %v", err) + } +} + +func TestManifest_Validate_RejectsBadPaths(t *testing.T) { + cases := map[string]string{ + "": "empty", + "/etc/passwd": "absolute", + "../escape": "traversal", + "./current": "non-canonical", + "config//double": "non-canonical empty segment", + `config\back`: "backslash", + "a/../../b": "traversal in middle", + } + for badPath, why := range cases { + t.Run(why, func(t *testing.T) { + m := Manifest{ + SnapshotID: "x", + CreatedAt: time.Now().UTC(), + Files: map[string]FileEntry{badPath: goodFileEntry()}, + } + if err := m.Validate(); err == nil { + t.Errorf("path %q (%s): expected error, got nil", badPath, why) + } + }) + } +} + +func TestManifest_Validate_RejectsBadSHA(t *testing.T) { + cases := map[string]FileEntry{ + "too short": {SHA256: "abcd", Size: 1, Mtime: time.Now()}, + "non-hex": {SHA256: strings.Repeat("zz", 32), Size: 1, Mtime: time.Now()}, + "empty": {SHA256: "", Size: 1, Mtime: time.Now()}, + "negative size": {SHA256: strings.Repeat("ab", 32), Size: -1, Mtime: time.Now()}, + "zero mtime": {SHA256: strings.Repeat("ab", 32), Size: 1}, + } + for name, fe := range cases { + t.Run(name, func(t *testing.T) { + m := Manifest{ + SnapshotID: "x", + CreatedAt: time.Now().UTC(), + Files: map[string]FileEntry{"options.txt": fe}, + } + if err := m.Validate(); err == nil { + t.Errorf("%s: expected error, got nil", name) + } + }) + } +} + +func TestManifest_Validate_EmptySnapshotID(t *testing.T) { + m := Manifest{CreatedAt: time.Now().UTC()} + if err := m.Validate(); err == nil { + t.Errorf("empty snapshot_id: expected error") + } +} + +func TestReadManifest_RejectsUnknownFields(t *testing.T) { + body := `{"snapshot_id":"x","created_at":"2026-01-01T00:00:00Z","files":{},"surprise":true}` + _, err := ReadManifest(bytes.NewBufferString(body)) + if err == nil { + t.Fatal("expected error for unknown field, got nil") + } +} + +func TestReadManifest_RejectsInvalidManifest(t *testing.T) { + // Well-formed JSON but bad sha + body := `{"snapshot_id":"x","created_at":"2026-01-01T00:00:00Z","files":{"options.txt":{"sha256":"short","size":1,"mtime":"2026-01-01T00:00:00Z"}}}` + _, err := ReadManifest(bytes.NewBufferString(body)) + if err == nil { + t.Fatal("expected validation error") + } +} + +func TestHashBytes(t *testing.T) { + got := HashBytes([]byte("hello")) + want := "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + if got != want { + t.Errorf("got %s, want %s", got, want) + } +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..b63e72e --- /dev/null +++ b/server.go @@ -0,0 +1,265 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "strconv" + "time" +) + +// QuotaProvider returns a per-user byte quota. Stub returns a default for +// every user; a future pg-backed impl reads users.cloud_quota_bytes. +type QuotaProvider interface { + Quota(ctx context.Context, user string) (int64, error) +} + +type DefaultQuota int64 + +func (q DefaultQuota) Quota(_ context.Context, _ string) (int64, error) { + return int64(q), nil +} + +// Server holds wiring. Routes registered in NewServer. +type Server struct { + storage *Storage + verifier Verifier + quota QuotaProvider + mux *http.ServeMux +} + +func NewServer(storage *Storage, verifier Verifier, quota QuotaProvider) *Server { + s := &Server{storage: storage, verifier: verifier, quota: quota, mux: http.NewServeMux()} + s.routes() + return s +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.mux.ServeHTTP(w, r) +} + +func (s *Server) routes() { + s.mux.HandleFunc("GET /v1/manifest", s.requireAuth(s.handleGetManifest)) + s.mux.HandleFunc("GET /v1/blob/{sha}", s.requireAuth(s.handleGetBlob)) + s.mux.HandleFunc("POST /v1/snapshot", s.requireAuth(s.handlePostSnapshot)) + s.mux.HandleFunc("GET /v1/snapshots", s.requireAuth(s.handleListSnapshots)) + s.mux.HandleFunc("GET /v1/snapshot/{id}", s.requireAuth(s.handleGetSnapshot)) + s.mux.HandleFunc("DELETE /v1/snapshot/{id}", s.requireAuth(s.handleDeleteSnapshot)) + s.mux.HandleFunc("GET /v1/quota", s.requireAuth(s.handleGetQuota)) + s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) +} + +// authedHandler is a handler that gets the verified caller info. +type authedHandler func(w http.ResponseWriter, r *http.Request, who *AuthInfo) + +func (s *Server) requireAuth(h authedHandler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + token := extractBearer(r) + if token == "" { + writeJSONError(w, http.StatusUnauthorized, "missing bearer token") + return + } + info, err := s.verifier.Verify(r.Context(), token) + if err != nil { + switch { + case errors.Is(err, ErrUnauthenticated): + writeJSONError(w, http.StatusUnauthorized, "invalid token") + case errors.Is(err, ErrRevoked): + writeJSONError(w, http.StatusForbidden, "token revoked") + default: + slog.Warn("auth verify failed", "err", err) + writeJSONError(w, http.StatusBadGateway, "auth backend unavailable") + } + return + } + if !info.HasScope("cloud:rw") { + writeJSONError(w, http.StatusForbidden, "token lacks cloud:rw scope") + return + } + h(w, r, info) + } +} + +// GET /v1/manifest +func (s *Server) handleGetManifest(w http.ResponseWriter, _ *http.Request, who *AuthInfo) { + m, err := s.storage.ReadManifest(who.User) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + w.WriteHeader(http.StatusNoContent) + return + } + writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("read manifest: %v", err)) + return + } + writeJSON(w, http.StatusOK, m) +} + +// GET /v1/blob/{sha} +func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request, who *AuthInfo) { + sha := r.PathValue("sha") + rc, err := s.storage.ReadBlob(who.User, sha) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + writeJSONError(w, http.StatusNotFound, "blob not found") + return + } + writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("read blob: %v", err)) + return + } + defer rc.Close() + w.Header().Set("Content-Type", "application/octet-stream") + _, _ = io.Copy(w, rc) +} + +// POST /v1/snapshot +// Body: multipart/form-data with parts: +// manifest — application/json (the Manifest) +// tarball — application/octet-stream (the .tar.zst payload) +// The handler does NOT enforce that the tarball matches the manifest's +// content list — that's a v2 concern. For v1 the manifest is what governs. +func (s *Server) handlePostSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) { + // 50 MB in-memory cap; larger streams to temp files. Tunable. + if err := r.ParseMultipartForm(50 << 20); err != nil { + writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("parse multipart: %v", err)) + return + } + manifestFile, _, err := r.FormFile("manifest") + if err != nil { + writeJSONError(w, http.StatusBadRequest, "missing manifest part") + return + } + defer manifestFile.Close() + m, err := ReadManifest(manifestFile) + if err != nil { + writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("invalid manifest: %v", err)) + return + } + + tarFile, _, err := r.FormFile("tarball") + if err != nil { + writeJSONError(w, http.StatusBadRequest, "missing tarball part") + return + } + defer tarFile.Close() + tarBytes, err := io.ReadAll(tarFile) + if err != nil { + writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("read tarball: %v", err)) + return + } + + // Quota check: estimate new usage = current + tarball + small overhead. + used, err := s.storage.UsageBytes(who.User) + if err != nil { + writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("usage: %v", err)) + return + } + limit, err := s.quota.Quota(r.Context(), who.User) + if err != nil { + writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("quota: %v", err)) + return + } + if used+int64(len(tarBytes)) > limit { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusRequestEntityTooLarge) + _ = json.NewEncoder(w).Encode(map[string]any{ + "error": "quota", + "used": used, + "limit": limit, + }) + return + } + + if err := s.storage.StoreSnapshot(who.User, m, tarBytes); err != nil { + writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("store: %v", err)) + return + } + writeJSON(w, http.StatusOK, map[string]string{ + "snapshot_id": m.SnapshotID, + "snapshot_url": "/v1/snapshot/" + m.SnapshotID, + }) +} + +// GET /v1/snapshots +func (s *Server) handleListSnapshots(w http.ResponseWriter, _ *http.Request, who *AuthInfo) { + list, err := s.storage.ListSnapshots(who.User) + if err != nil { + writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("list: %v", err)) + return + } + writeJSON(w, http.StatusOK, map[string]any{"snapshots": list}) +} + +// GET /v1/snapshot/{id} +func (s *Server) handleGetSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) { + id := r.PathValue("id") + rc, err := s.storage.OpenSnapshot(who.User, id) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + writeJSONError(w, http.StatusNotFound, "snapshot not found") + return + } + writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("open snapshot: %v", err)) + return + } + defer rc.Close() + w.Header().Set("Content-Type", "application/zstd") + w.Header().Set("Content-Disposition", + fmt.Sprintf(`attachment; filename="%s.tar.zst"`, id)) + _, _ = io.Copy(w, rc) +} + +// DELETE /v1/snapshot/{id} +func (s *Server) handleDeleteSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) { + id := r.PathValue("id") + if err := s.storage.DeleteSnapshot(who.User, id); err != nil { + writeJSONError(w, http.StatusBadRequest, err.Error()) + return + } + w.WriteHeader(http.StatusNoContent) +} + +// GET /v1/quota +func (s *Server) handleGetQuota(w http.ResponseWriter, r *http.Request, who *AuthInfo) { + used, err := s.storage.UsageBytes(who.User) + if err != nil { + writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("usage: %v", err)) + return + } + limit, err := s.quota.Quota(r.Context(), who.User) + if err != nil { + writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("quota: %v", err)) + return + } + list, _ := s.storage.ListSnapshots(who.User) + writeJSON(w, http.StatusOK, map[string]any{ + "used_bytes": used, + "limit_bytes": limit, + "snapshots": len(list), + }) +} + +// writeJSON serializes v as JSON with the right Content-Type. Errors logged, +// not returned — by the time we're encoding the response, we've committed. +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(v); err != nil { + slog.Warn("encode response", "err", err) + } +} + +func writeJSONError(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} + +// secondsSinceEpoch is exported for handlers/tests that need stable timestamp +// strings without round-tripping through time.Time.Format. +func secondsSinceEpoch() string { return strconv.FormatInt(time.Now().Unix(), 10) } diff --git a/server_test.go b/server_test.go new file mode 100644 index 0000000..a035cf0 --- /dev/null +++ b/server_test.go @@ -0,0 +1,304 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "mime/multipart" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +// scopedVerifier accepts a fixed token and reports a fixed user + scope set. +type scopedVerifier struct { + token string + user string + scopes []string + err error +} + +func (v *scopedVerifier) Verify(_ context.Context, token string) (*AuthInfo, error) { + if v.err != nil { + return nil, v.err + } + if token != v.token { + return nil, ErrUnauthenticated + } + return &AuthInfo{User: v.user, Scopes: v.scopes}, nil +} + +func newTestServer(t *testing.T) (*Server, *Storage, *scopedVerifier) { + t.Helper() + st := newStorage(t) + v := &scopedVerifier{ + token: "good-token", + user: "user1", + scopes: []string{"cloud:rw"}, + } + return NewServer(st, v, DefaultQuota(1<<20 /* 1 MB */)), st, v +} + +func authReq(t *testing.T, method, path string, body io.Reader) *http.Request { + t.Helper() + r := httptest.NewRequest(method, path, body) + r.Header.Set("Authorization", "Bearer good-token") + return r +} + +func TestServer_Auth_MissingBearer_401(t *testing.T) { + s, _, _ := newTestServer(t) + rec := httptest.NewRecorder() + s.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/v1/manifest", nil)) + if rec.Code != http.StatusUnauthorized { + t.Errorf("got %d, want 401", rec.Code) + } +} + +func TestServer_Auth_BadToken_401(t *testing.T) { + s, _, _ := newTestServer(t) + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/v1/manifest", nil) + r.Header.Set("Authorization", "Bearer wrong-token") + s.ServeHTTP(rec, r) + if rec.Code != http.StatusUnauthorized { + t.Errorf("got %d, want 401", rec.Code) + } +} + +func TestServer_Auth_Revoked_403(t *testing.T) { + s, _, v := newTestServer(t) + v.err = ErrRevoked + rec := httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/manifest", nil)) + if rec.Code != http.StatusForbidden { + t.Errorf("got %d, want 403", rec.Code) + } +} + +func TestServer_Auth_MissingScope_403(t *testing.T) { + s, _, v := newTestServer(t) + v.scopes = []string{"some-other-scope"} + rec := httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/manifest", nil)) + if rec.Code != http.StatusForbidden { + t.Errorf("got %d, want 403", rec.Code) + } +} + +func TestServer_Auth_BackendError_502(t *testing.T) { + s, _, v := newTestServer(t) + v.err = errors.New("upstream down") + rec := httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/manifest", nil)) + if rec.Code != http.StatusBadGateway { + t.Errorf("got %d, want 502", rec.Code) + } +} + +func TestServer_GetManifest_NoSnapshots_204(t *testing.T) { + s, _, _ := newTestServer(t) + rec := httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/manifest", nil)) + if rec.Code != http.StatusNoContent { + t.Errorf("got %d, want 204", rec.Code) + } +} + +func TestServer_PostSnapshot_RoundTrip(t *testing.T) { + s, st, _ := newTestServer(t) + + // Write a blob first (simulates the client side). + blob := []byte("file contents") + sha, err := st.WriteBlob("user1", blob) + if err != nil { + t.Fatalf("WriteBlob: %v", err) + } + + manifest := &Manifest{ + SnapshotID: "01TESTSNAPSHOT0001", + CreatedAt: time.Date(2026, 6, 2, 12, 0, 0, 0, time.UTC), + Files: map[string]FileEntry{ + "options.txt": { + SHA256: sha, + Size: int64(len(blob)), + Mtime: time.Date(2026, 6, 2, 11, 0, 0, 0, time.UTC), + }, + }, + } + manifestJSON, _ := json.Marshal(manifest) + + body := &bytes.Buffer{} + mw := multipart.NewWriter(body) + fw, _ := mw.CreateFormFile("manifest", "manifest.json") + _, _ = fw.Write(manifestJSON) + fw, _ = mw.CreateFormFile("tarball", "snapshot.tar.zst") + _, _ = fw.Write([]byte("fake-tarball-bytes")) + mw.Close() + + req := authReq(t, http.MethodPost, "/v1/snapshot", body) + req.Header.Set("Content-Type", mw.FormDataContentType()) + rec := httptest.NewRecorder() + s.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("got %d, body=%s", rec.Code, rec.Body.String()) + } + var resp map[string]string + _ = json.Unmarshal(rec.Body.Bytes(), &resp) + if resp["snapshot_id"] != "01TESTSNAPSHOT0001" { + t.Errorf("snapshot_id: got %s", resp["snapshot_id"]) + } + + // Now GET /v1/manifest should return the same + rec = httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/manifest", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("manifest after push: %d", rec.Code) + } + var got Manifest + _ = json.Unmarshal(rec.Body.Bytes(), &got) + if got.SnapshotID != "01TESTSNAPSHOT0001" { + t.Errorf("manifest snapshot id: got %s", got.SnapshotID) + } +} + +func TestServer_GetBlob_ReturnsContent(t *testing.T) { + s, st, _ := newTestServer(t) + sha, _ := st.WriteBlob("user1", []byte("blob-payload")) + rec := httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/blob/"+sha, nil)) + if rec.Code != http.StatusOK { + t.Fatalf("got %d", rec.Code) + } + if rec.Body.String() != "blob-payload" { + t.Errorf("body: got %q", rec.Body.String()) + } +} + +func TestServer_GetBlob_404(t *testing.T) { + s, _, _ := newTestServer(t) + rec := httptest.NewRecorder() + missing := strings.Repeat("0", 64) + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/blob/"+missing, nil)) + if rec.Code != http.StatusNotFound { + t.Errorf("got %d, want 404", rec.Code) + } +} + +func TestServer_PostSnapshot_BadManifest_400(t *testing.T) { + s, _, _ := newTestServer(t) + body := &bytes.Buffer{} + mw := multipart.NewWriter(body) + fw, _ := mw.CreateFormFile("manifest", "manifest.json") + _, _ = fw.Write([]byte(`{"snapshot_id":"","created_at":"2026-01-01T00:00:00Z","files":{}}`)) + fw, _ = mw.CreateFormFile("tarball", "snapshot.tar.zst") + _, _ = fw.Write([]byte("x")) + mw.Close() + req := authReq(t, http.MethodPost, "/v1/snapshot", body) + req.Header.Set("Content-Type", mw.FormDataContentType()) + rec := httptest.NewRecorder() + s.ServeHTTP(rec, req) + if rec.Code != http.StatusBadRequest { + t.Errorf("got %d, want 400", rec.Code) + } +} + +func TestServer_PostSnapshot_QuotaExceeded_413(t *testing.T) { + st := newStorage(t) + v := &scopedVerifier{token: "good-token", user: "u", scopes: []string{"cloud:rw"}} + s := NewServer(st, v, DefaultQuota(100)) // 100 bytes + + manifest := &Manifest{ + SnapshotID: "01QUOTATEST00000001", + CreatedAt: time.Date(2026, 6, 2, 12, 0, 0, 0, time.UTC), + Files: map[string]FileEntry{ + "f.txt": { + SHA256: HashBytes([]byte("x")), + Size: 1, + Mtime: time.Date(2026, 6, 2, 11, 0, 0, 0, time.UTC), + }, + }, + } + manifestJSON, _ := json.Marshal(manifest) + + body := &bytes.Buffer{} + mw := multipart.NewWriter(body) + fw, _ := mw.CreateFormFile("manifest", "manifest.json") + _, _ = fw.Write(manifestJSON) + fw, _ = mw.CreateFormFile("tarball", "tar.zst") + _, _ = fw.Write(make([]byte, 200)) // > 100 byte quota + mw.Close() + + req := authReq(t, http.MethodPost, "/v1/snapshot", body) + req.Header.Set("Content-Type", mw.FormDataContentType()) + rec := httptest.NewRecorder() + s.ServeHTTP(rec, req) + if rec.Code != http.StatusRequestEntityTooLarge { + t.Errorf("got %d, want 413", rec.Code) + } +} + +func TestServer_ListSnapshots(t *testing.T) { + s, st, _ := newTestServer(t) + sha, _ := st.WriteBlob("user1", []byte("y")) + for _, id := range []string{"01AAA", "01BBB"} { + m := buildManifest(id, sha) + if err := st.StoreSnapshot("user1", m, []byte("t")); err != nil { + t.Fatal(err) + } + } + rec := httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/snapshots", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("got %d", rec.Code) + } + var resp struct { + Snapshots []SnapshotInfo `json:"snapshots"` + } + _ = json.Unmarshal(rec.Body.Bytes(), &resp) + if len(resp.Snapshots) != 2 { + t.Errorf("snapshots: got %d, want 2", len(resp.Snapshots)) + } +} + +func TestServer_GetQuota(t *testing.T) { + s, _, _ := newTestServer(t) + rec := httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodGet, "/v1/quota", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("got %d", rec.Code) + } + var resp map[string]any + _ = json.Unmarshal(rec.Body.Bytes(), &resp) + if _, ok := resp["limit_bytes"]; !ok { + t.Errorf("missing limit_bytes; body=%s", rec.Body.String()) + } +} + +func TestServer_Healthz(t *testing.T) { + s, _, _ := newTestServer(t) + rec := httptest.NewRecorder() + s.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/healthz", nil)) + if rec.Code != http.StatusOK || rec.Body.String() != "ok" { + t.Errorf("got code=%d body=%q", rec.Code, rec.Body.String()) + } +} + +func TestServer_DeleteSnapshot_RefusesLatest_400(t *testing.T) { + s, st, _ := newTestServer(t) + sha, _ := st.WriteBlob("user1", []byte("z")) + m := buildManifest("01ONLYSNAPSHOT00001", sha) + if err := st.StoreSnapshot("user1", m, []byte("tar")); err != nil { + t.Fatal(err) + } + rec := httptest.NewRecorder() + s.ServeHTTP(rec, authReq(t, http.MethodDelete, "/v1/snapshot/01ONLYSNAPSHOT00001", nil)) + if rec.Code != http.StatusBadRequest { + t.Errorf("got %d, want 400", rec.Code) + } +} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..aebc750 --- /dev/null +++ b/storage.go @@ -0,0 +1,347 @@ +package main + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strings" + "sync" +) + +// Storage owns the on-disk layout described in DESIGN.md: +// +// //manifest.json latest snapshot's manifest +// //snapshots/.tar.zst immutable per-snapshot tarball +// //snapshots/.manifest.json historical manifest +// //blobs// content-addressed dedupe +// +// Methods are safe for concurrent use across users; mutations to a single +// user are serialized via a per-user mutex. +type Storage struct { + root string + + mu sync.Mutex + locks map[string]*sync.Mutex // per-user serialization +} + +func NewStorage(root string) (*Storage, error) { + if root == "" { + return nil, errors.New("storage root is empty") + } + abs, err := filepath.Abs(root) + if err != nil { + return nil, fmt.Errorf("resolve storage root: %w", err) + } + if err := os.MkdirAll(abs, 0o700); err != nil { + return nil, fmt.Errorf("create storage root: %w", err) + } + return &Storage{root: abs, locks: map[string]*sync.Mutex{}}, nil +} + +// lockUser returns a mutex held until the caller calls .Unlock(). Per-user +// serialization keeps concurrent pushes from interleaving manifest writes. +func (s *Storage) lockUser(user string) *sync.Mutex { + s.mu.Lock() + defer s.mu.Unlock() + m, ok := s.locks[user] + if !ok { + m = &sync.Mutex{} + s.locks[user] = m + } + m.Lock() + return m +} + +func (s *Storage) userDir(user string) string { return filepath.Join(s.root, user) } +func (s *Storage) blobsDir(user string) string { return filepath.Join(s.userDir(user), "blobs") } +func (s *Storage) snapsDir(user string) string { return filepath.Join(s.userDir(user), "snapshots") } +func (s *Storage) manifestPath(user string) string { + return filepath.Join(s.userDir(user), "manifest.json") +} +func (s *Storage) blobPath(user, sha string) string { + return filepath.Join(s.blobsDir(user), sha[:2], sha) +} + +// ReadManifest returns the user's latest manifest. Returns os.ErrNotExist +// (verbatim) when the user has no snapshots yet so callers can map to HTTP 204. +func (s *Storage) ReadManifest(user string) (*Manifest, error) { + if err := validateUserID(user); err != nil { + return nil, err + } + f, err := os.Open(s.manifestPath(user)) + if err != nil { + return nil, err + } + defer f.Close() + var m Manifest + if err := json.NewDecoder(f).Decode(&m); err != nil { + return nil, fmt.Errorf("decode manifest: %w", err) + } + return &m, nil +} + +// WriteBlob stores raw file content under blobs//. Returns the +// SHA computed from the data. If a blob with the same SHA already exists, +// the write is skipped (content-addressed dedupe). Atomic via tmp-rename. +func (s *Storage) WriteBlob(user string, data []byte) (string, error) { + if err := validateUserID(user); err != nil { + return "", err + } + sum := sha256.Sum256(data) + sha := hex.EncodeToString(sum[:]) + dest := s.blobPath(user, sha) + if _, err := os.Stat(dest); err == nil { + return sha, nil // already have it + } + if err := os.MkdirAll(filepath.Dir(dest), 0o700); err != nil { + return "", fmt.Errorf("create blob dir: %w", err) + } + tmp := dest + ".tmp" + if err := os.WriteFile(tmp, data, 0o600); err != nil { + return "", fmt.Errorf("write blob tmp: %w", err) + } + if err := os.Rename(tmp, dest); err != nil { + _ = os.Remove(tmp) + return "", fmt.Errorf("rename blob: %w", err) + } + return sha, nil +} + +// ReadBlob opens a blob by SHA. Caller closes. +func (s *Storage) ReadBlob(user, sha string) (io.ReadCloser, error) { + if err := validateUserID(user); err != nil { + return nil, err + } + if len(sha) != 64 { + return nil, fmt.Errorf("invalid sha256: length %d", len(sha)) + } + return os.Open(s.blobPath(user, sha)) +} + +// StoreSnapshot persists a snapshot atomically: +// 1. Write tarball to snapshots/.tar.zst.tmp +// 2. Write per-snapshot manifest to snapshots/.manifest.json.tmp +// 3. Rename both into place +// 4. Update latest user/manifest.json (atomic rename) +// +// The caller has already written the constituent blobs via WriteBlob before +// invoking this — the tarball just bundles them. +func (s *Storage) StoreSnapshot(user string, m *Manifest, tarball []byte) error { + if err := validateUserID(user); err != nil { + return err + } + if err := m.Validate(); err != nil { + return err + } + + unlock := s.lockUser(user) + defer unlock.Unlock() + + if err := os.MkdirAll(s.snapsDir(user), 0o700); err != nil { + return fmt.Errorf("create snapshots dir: %w", err) + } + + tarPath := filepath.Join(s.snapsDir(user), m.SnapshotID+".tar.zst") + manPath := filepath.Join(s.snapsDir(user), m.SnapshotID+".manifest.json") + if _, err := os.Stat(tarPath); err == nil { + return fmt.Errorf("snapshot %s already exists", m.SnapshotID) + } + + tarTmp := tarPath + ".tmp" + if err := os.WriteFile(tarTmp, tarball, 0o600); err != nil { + return fmt.Errorf("write tarball: %w", err) + } + manBytes, err := json.MarshalIndent(m, "", " ") + if err != nil { + _ = os.Remove(tarTmp) + return fmt.Errorf("marshal manifest: %w", err) + } + manTmp := manPath + ".tmp" + if err := os.WriteFile(manTmp, manBytes, 0o600); err != nil { + _ = os.Remove(tarTmp) + return fmt.Errorf("write snapshot manifest: %w", err) + } + + if err := os.Rename(tarTmp, tarPath); err != nil { + _ = os.Remove(tarTmp) + _ = os.Remove(manTmp) + return fmt.Errorf("rename tarball: %w", err) + } + if err := os.Rename(manTmp, manPath); err != nil { + _ = os.Remove(manTmp) + return fmt.Errorf("rename snapshot manifest: %w", err) + } + + // Update latest manifest. Atomic rename so readers never see partial. + latestTmp := s.manifestPath(user) + ".tmp" + if err := os.WriteFile(latestTmp, manBytes, 0o600); err != nil { + return fmt.Errorf("write latest manifest: %w", err) + } + if err := os.Rename(latestTmp, s.manifestPath(user)); err != nil { + _ = os.Remove(latestTmp) + return fmt.Errorf("rename latest manifest: %w", err) + } + return nil +} + +// SnapshotInfo is one element of ListSnapshots. +type SnapshotInfo struct { + ID string `json:"id"` + CreatedAt string `json:"created_at"` + SizeBytes int64 `json:"size_bytes"` +} + +// ListSnapshots returns the user's snapshot history, newest first. +func (s *Storage) ListSnapshots(user string) ([]SnapshotInfo, error) { + if err := validateUserID(user); err != nil { + return nil, err + } + dir := s.snapsDir(user) + entries, err := os.ReadDir(dir) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, err + } + var out []SnapshotInfo + for _, e := range entries { + if !strings.HasSuffix(e.Name(), ".tar.zst") { + continue + } + id := strings.TrimSuffix(e.Name(), ".tar.zst") + info, err := e.Info() + if err != nil { + continue + } + // Read the per-snapshot manifest for created_at. + manPath := filepath.Join(dir, id+".manifest.json") + mf, err := os.Open(manPath) + var createdAt string + if err == nil { + var sm Manifest + if json.NewDecoder(mf).Decode(&sm) == nil { + createdAt = sm.CreatedAt.UTC().Format("2006-01-02T15:04:05Z") + } + mf.Close() + } + out = append(out, SnapshotInfo{ + ID: id, + CreatedAt: createdAt, + SizeBytes: info.Size(), + }) + } + sort.Slice(out, func(i, j int) bool { return out[i].ID > out[j].ID }) + return out, nil +} + +// OpenSnapshot returns the historical tarball reader. Caller closes. +func (s *Storage) OpenSnapshot(user, id string) (io.ReadCloser, error) { + if err := validateUserID(user); err != nil { + return nil, err + } + if !looksLikeSnapshotID(id) { + return nil, fmt.Errorf("invalid snapshot id: %q", id) + } + return os.Open(filepath.Join(s.snapsDir(user), id+".tar.zst")) +} + +// DeleteSnapshot removes a historical snapshot. Refuses to delete the latest +// (returns error). The blob GC step is separate. +func (s *Storage) DeleteSnapshot(user, id string) error { + if err := validateUserID(user); err != nil { + return err + } + if !looksLikeSnapshotID(id) { + return fmt.Errorf("invalid snapshot id: %q", id) + } + + unlock := s.lockUser(user) + defer unlock.Unlock() + + // Refuse to delete if id matches the latest manifest + latest, err := s.ReadManifest(user) + if err == nil && latest.SnapshotID == id { + return errors.New("cannot delete latest snapshot") + } + + tarPath := filepath.Join(s.snapsDir(user), id+".tar.zst") + manPath := filepath.Join(s.snapsDir(user), id+".manifest.json") + if err := os.Remove(tarPath); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + if err := os.Remove(manPath); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + return nil +} + +// UsageBytes is total bytes occupied by a user's snapshots + blobs. +// Cheap walk; called for quota checks. Doesn't double-count files +// referenced from both blobs/ and snapshots/. +func (s *Storage) UsageBytes(user string) (int64, error) { + if err := validateUserID(user); err != nil { + return 0, err + } + var total int64 + walk := func(p string) error { + return filepath.Walk(p, func(_ string, info os.FileInfo, err error) error { + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + if info.IsDir() { + return nil + } + total += info.Size() + return nil + }) + } + for _, d := range []string{s.blobsDir(user), s.snapsDir(user)} { + if err := walk(d); err != nil { + return 0, err + } + } + return total, nil +} + +// validateUserID guards against directory traversal via user identifiers. +// Discord IDs are numeric strings; we accept anything safely path-segment-able. +func validateUserID(user string) error { + if user == "" { + return errors.New("empty user id") + } + if strings.ContainsAny(user, "/\\.") { + return fmt.Errorf("user id contains invalid character: %q", user) + } + return nil +} + +// looksLikeSnapshotID: ULIDs are 26 chars, Crockford base32. We allow +// any non-empty alphanumeric+underscore of reasonable length to keep tests +// using simple IDs flexible — the security need here is path-safety, not +// strict ULID compliance. +func looksLikeSnapshotID(id string) bool { + if len(id) == 0 || len(id) > 64 { + return false + } + for _, c := range id { + switch { + case c >= 'A' && c <= 'Z': + case c >= 'a' && c <= 'z': + case c >= '0' && c <= '9': + case c == '-' || c == '_': + default: + return false + } + } + return true +} diff --git a/storage_test.go b/storage_test.go new file mode 100644 index 0000000..6c9143a --- /dev/null +++ b/storage_test.go @@ -0,0 +1,193 @@ +package main + +import ( + "errors" + "io" + "os" + "strings" + "testing" + "time" +) + +func newStorage(t *testing.T) *Storage { + t.Helper() + dir := t.TempDir() + s, err := NewStorage(dir) + if err != nil { + t.Fatalf("NewStorage: %v", err) + } + return s +} + +func buildManifest(id, sha string) *Manifest { + return &Manifest{ + SnapshotID: id, + CreatedAt: time.Date(2026, 6, 2, 12, 0, 0, 0, time.UTC), + Files: map[string]FileEntry{ + "options.txt": { + SHA256: sha, + Size: 5, + Mtime: time.Date(2026, 6, 2, 11, 30, 0, 0, time.UTC), + }, + }, + } +} + +func TestStorage_RoundTripSnapshot(t *testing.T) { + s := newStorage(t) + + sha, err := s.WriteBlob("123456789", []byte("hello")) + if err != nil { + t.Fatalf("WriteBlob: %v", err) + } + if sha != HashBytes([]byte("hello")) { + t.Fatalf("sha mismatch: got %s", sha) + } + + m := buildManifest("01J9XQK4Z3DEMO0001", sha) + if err := s.StoreSnapshot("123456789", m, []byte("fake-tarball-bytes")); err != nil { + t.Fatalf("StoreSnapshot: %v", err) + } + + got, err := s.ReadManifest("123456789") + if err != nil { + t.Fatalf("ReadManifest: %v", err) + } + if got.SnapshotID != "01J9XQK4Z3DEMO0001" { + t.Errorf("snapshot id mismatch: got %s", got.SnapshotID) + } + if _, ok := got.Files["options.txt"]; !ok { + t.Errorf("files key missing") + } + + rc, err := s.ReadBlob("123456789", sha) + if err != nil { + t.Fatalf("ReadBlob: %v", err) + } + defer rc.Close() + data, _ := io.ReadAll(rc) + if string(data) != "hello" { + t.Errorf("blob content: got %q", string(data)) + } +} + +func TestStorage_BlobDedupe(t *testing.T) { + s := newStorage(t) + a, _ := s.WriteBlob("u1", []byte("xyz")) + b, _ := s.WriteBlob("u1", []byte("xyz")) + if a != b { + t.Errorf("dedupe broken: got %s vs %s", a, b) + } + // Both calls should leave one blob on disk + p := s.blobPath("u1", a) + info, err := os.Stat(p) + if err != nil { + t.Fatalf("stat blob: %v", err) + } + if info.Size() != 3 { + t.Errorf("blob size: got %d, want 3", info.Size()) + } +} + +func TestStorage_ReadManifest_MissingIsNotExist(t *testing.T) { + s := newStorage(t) + _, err := s.ReadManifest("never-stored") + if !errors.Is(err, os.ErrNotExist) { + t.Fatalf("expected os.ErrNotExist, got %v", err) + } +} + +func TestStorage_ListSnapshots_NewestFirst(t *testing.T) { + s := newStorage(t) + user := "u1" + sha, _ := s.WriteBlob(user, []byte("x")) + for _, id := range []string{"01ABC", "01BBB", "01ZZZ"} { + m := buildManifest(id, sha) + if err := s.StoreSnapshot(user, m, []byte("tar")); err != nil { + t.Fatalf("StoreSnapshot %s: %v", id, err) + } + } + list, err := s.ListSnapshots(user) + if err != nil { + t.Fatalf("ListSnapshots: %v", err) + } + if len(list) != 3 { + t.Fatalf("expected 3 snapshots, got %d", len(list)) + } + if list[0].ID != "01ZZZ" || list[2].ID != "01ABC" { + t.Errorf("ordering wrong: %v", []string{list[0].ID, list[1].ID, list[2].ID}) + } +} + +func TestStorage_DeleteSnapshot_RefusesLatest(t *testing.T) { + s := newStorage(t) + sha, _ := s.WriteBlob("u", []byte("y")) + m := buildManifest("01OLD0", sha) + if err := s.StoreSnapshot("u", m, []byte("t1")); err != nil { + t.Fatal(err) + } + m2 := buildManifest("01NEW0", sha) + if err := s.StoreSnapshot("u", m2, []byte("t2")); err != nil { + t.Fatal(err) + } + // Latest = 01NEW0; deleting it should fail + if err := s.DeleteSnapshot("u", "01NEW0"); err == nil { + t.Error("expected error deleting latest, got nil") + } + // Old one deletes + if err := s.DeleteSnapshot("u", "01OLD0"); err != nil { + t.Errorf("delete old: %v", err) + } +} + +func TestStorage_StoreSnapshot_RejectsBadManifest(t *testing.T) { + s := newStorage(t) + bad := &Manifest{} + if err := s.StoreSnapshot("u", bad, []byte("x")); err == nil { + t.Error("expected validation error for empty manifest") + } +} + +func TestStorage_RejectsBadUserID(t *testing.T) { + s := newStorage(t) + cases := []string{"", "../etc", "u/path", "user\\back", "with.dot"} + for _, u := range cases { + t.Run(u, func(t *testing.T) { + _, err := s.WriteBlob(u, []byte("x")) + if err == nil { + t.Errorf("user %q: expected error", u) + } + }) + } +} + +func TestStorage_RejectsBadSnapshotID(t *testing.T) { + s := newStorage(t) + cases := []string{"", "../escape", "with/slash", strings.Repeat("a", 100), "with space"} + for _, id := range cases { + t.Run(id, func(t *testing.T) { + _, err := s.OpenSnapshot("u", id) + if err == nil { + t.Errorf("id %q: expected error", id) + } + }) + } +} + +func TestStorage_UsageBytes(t *testing.T) { + s := newStorage(t) + user := "u" + _, _ = s.WriteBlob(user, []byte("0123456789")) // 10 bytes + m := buildManifest("01X", HashBytes([]byte("0123456789"))) + if err := s.StoreSnapshot(user, m, make([]byte, 50)); err != nil { + t.Fatal(err) + } + got, err := s.UsageBytes(user) + if err != nil { + t.Fatalf("UsageBytes: %v", err) + } + // 10 (blob) + 50 (tar) + manifest JSON on disk (variable but small) + if got < 60 { + t.Errorf("expected at least 60 bytes, got %d", got) + } +}