Files
cloud-svc/server.go
T
claude-timemachine 2b7290626e
CI / validate (push) Successful in 13s
CI / docker (push) Successful in 14s
server: extract tarball into blob store on snapshot upload
The deferred 'hardlink blobs from tarball' optimization from DESIGN.md
landed as 'just walk the tarball and write blobs separately' for v1.
GET /v1/blob/{sha} was 404'ing because the blob store was empty —
storage only had snapshots/<id>.tar.zst and a manifest.

Server now:
1. Parses uploaded multipart manifest + tarball
2. Walks the tar entries, computes each entry's sha256
3. Cross-checks against the manifest's declared sha (rejects 400 on mismatch)
4. Writes each blob to <user>/blobs/ via Storage.WriteBlob
5. Then stores the snapshot tarball + manifest as before

2 new tests cover: (a) POST then GET /v1/blob/{sha} round-trip,
(b) manifest-claims-different-sha-than-tarball rejection.

Discovered via e2e smoke against frazclient: pull 404'd on every blob
after a successful push. 33/33 tests pass.
2026-06-02 19:08:56 +02:00

317 lines
9.9 KiB
Go

package main
import (
"archive/tar"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strconv"
"time"
)
// QuotaProvider returns a per-user byte quota. Stub returns a default for
// every user; a future pg-backed impl reads users.cloud_quota_bytes.
type QuotaProvider interface {
Quota(ctx context.Context, user string) (int64, error)
}
type DefaultQuota int64
func (q DefaultQuota) Quota(_ context.Context, _ string) (int64, error) {
return int64(q), nil
}
// Server holds wiring. Routes registered in NewServer.
type Server struct {
storage *Storage
verifier Verifier
quota QuotaProvider
mux *http.ServeMux
}
func NewServer(storage *Storage, verifier Verifier, quota QuotaProvider) *Server {
s := &Server{storage: storage, verifier: verifier, quota: quota, mux: http.NewServeMux()}
s.routes()
return s
}
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.mux.ServeHTTP(w, r)
}
func (s *Server) routes() {
s.mux.HandleFunc("GET /v1/manifest", s.requireAuth(s.handleGetManifest))
s.mux.HandleFunc("GET /v1/blob/{sha}", s.requireAuth(s.handleGetBlob))
s.mux.HandleFunc("POST /v1/snapshot", s.requireAuth(s.handlePostSnapshot))
s.mux.HandleFunc("GET /v1/snapshots", s.requireAuth(s.handleListSnapshots))
s.mux.HandleFunc("GET /v1/snapshot/{id}", s.requireAuth(s.handleGetSnapshot))
s.mux.HandleFunc("DELETE /v1/snapshot/{id}", s.requireAuth(s.handleDeleteSnapshot))
s.mux.HandleFunc("GET /v1/quota", s.requireAuth(s.handleGetQuota))
s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
}
// authedHandler is a handler that gets the verified caller info.
type authedHandler func(w http.ResponseWriter, r *http.Request, who *AuthInfo)
func (s *Server) requireAuth(h authedHandler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
token := extractBearer(r)
if token == "" {
writeJSONError(w, http.StatusUnauthorized, "missing bearer token")
return
}
info, err := s.verifier.Verify(r.Context(), token)
if err != nil {
switch {
case errors.Is(err, ErrUnauthenticated):
writeJSONError(w, http.StatusUnauthorized, "invalid token")
case errors.Is(err, ErrRevoked):
writeJSONError(w, http.StatusForbidden, "token revoked")
default:
slog.Warn("auth verify failed", "err", err)
writeJSONError(w, http.StatusBadGateway, "auth backend unavailable")
}
return
}
if !info.HasScope("cloud:rw") {
writeJSONError(w, http.StatusForbidden, "token lacks cloud:rw scope")
return
}
h(w, r, info)
}
}
// GET /v1/manifest
func (s *Server) handleGetManifest(w http.ResponseWriter, _ *http.Request, who *AuthInfo) {
m, err := s.storage.ReadManifest(who.User)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
w.WriteHeader(http.StatusNoContent)
return
}
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("read manifest: %v", err))
return
}
writeJSON(w, http.StatusOK, m)
}
// GET /v1/blob/{sha}
func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
sha := r.PathValue("sha")
rc, err := s.storage.ReadBlob(who.User, sha)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
writeJSONError(w, http.StatusNotFound, "blob not found")
return
}
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("read blob: %v", err))
return
}
defer rc.Close()
w.Header().Set("Content-Type", "application/octet-stream")
_, _ = io.Copy(w, rc)
}
// POST /v1/snapshot
// Body: multipart/form-data with parts:
// manifest — application/json (the Manifest)
// tarball — application/octet-stream (the .tar.zst payload)
// The handler does NOT enforce that the tarball matches the manifest's
// content list — that's a v2 concern. For v1 the manifest is what governs.
func (s *Server) handlePostSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
// 50 MB in-memory cap; larger streams to temp files. Tunable.
if err := r.ParseMultipartForm(50 << 20); err != nil {
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("parse multipart: %v", err))
return
}
manifestFile, _, err := r.FormFile("manifest")
if err != nil {
writeJSONError(w, http.StatusBadRequest, "missing manifest part")
return
}
defer manifestFile.Close()
m, err := ReadManifest(manifestFile)
if err != nil {
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("invalid manifest: %v", err))
return
}
tarFile, _, err := r.FormFile("tarball")
if err != nil {
writeJSONError(w, http.StatusBadRequest, "missing tarball part")
return
}
defer tarFile.Close()
tarBytes, err := io.ReadAll(tarFile)
if err != nil {
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("read tarball: %v", err))
return
}
// Quota check: estimate new usage = current + tarball + small overhead.
used, err := s.storage.UsageBytes(who.User)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("usage: %v", err))
return
}
limit, err := s.quota.Quota(r.Context(), who.User)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("quota: %v", err))
return
}
if used+int64(len(tarBytes)) > limit {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusRequestEntityTooLarge)
_ = json.NewEncoder(w).Encode(map[string]any{
"error": "quota",
"used": used,
"limit": limit,
})
return
}
// Extract tarball into blob store + verify each entry's hash matches
// the manifest. The tarball is uncompressed plain tar in v1; zstd
// support deferred until Python stdlib gains a zstd module.
if err := s.extractTarToBlobs(who.User, m, tarBytes); err != nil {
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("extract tarball: %v", err))
return
}
if err := s.storage.StoreSnapshot(who.User, m, tarBytes); err != nil {
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("store: %v", err))
return
}
writeJSON(w, http.StatusOK, map[string]string{
"snapshot_id": m.SnapshotID,
"snapshot_url": "/v1/snapshot/" + m.SnapshotID,
})
}
// extractTarToBlobs walks the uploaded tarball, writes each regular file's
// contents into the blob store (content-addressed dedupe), and verifies the
// hash against the manifest entry for that path. Files in the tarball that
// AREN'T declared in the manifest are ignored (defensive: never trust tar
// contents). Manifest entries without a tarball file are allowed (the
// blob may already exist from a previous snapshot).
func (s *Server) extractTarToBlobs(user string, m *Manifest, tarBytes []byte) error {
tr := tar.NewReader(bytes.NewReader(tarBytes))
for {
hdr, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("tar read: %w", err)
}
if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA {
continue
}
expected, ok := m.Files[hdr.Name]
if !ok {
// File in tar not declared in manifest. Skip; don't trust it.
continue
}
buf := make([]byte, 0, hdr.Size)
buf, err = io.ReadAll(tr)
if err != nil {
return fmt.Errorf("tar entry %s: %w", hdr.Name, err)
}
sha, err := s.storage.WriteBlob(user, buf)
if err != nil {
return fmt.Errorf("write blob for %s: %w", hdr.Name, err)
}
if sha != expected.SHA256 {
return fmt.Errorf("manifest mismatch for %s: tar sha %s != manifest sha %s",
hdr.Name, sha, expected.SHA256)
}
}
return nil
}
// GET /v1/snapshots
func (s *Server) handleListSnapshots(w http.ResponseWriter, _ *http.Request, who *AuthInfo) {
list, err := s.storage.ListSnapshots(who.User)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("list: %v", err))
return
}
writeJSON(w, http.StatusOK, map[string]any{"snapshots": list})
}
// GET /v1/snapshot/{id}
func (s *Server) handleGetSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
id := r.PathValue("id")
rc, err := s.storage.OpenSnapshot(who.User, id)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
writeJSONError(w, http.StatusNotFound, "snapshot not found")
return
}
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("open snapshot: %v", err))
return
}
defer rc.Close()
w.Header().Set("Content-Type", "application/zstd")
w.Header().Set("Content-Disposition",
fmt.Sprintf(`attachment; filename="%s.tar.zst"`, id))
_, _ = io.Copy(w, rc)
}
// DELETE /v1/snapshot/{id}
func (s *Server) handleDeleteSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
id := r.PathValue("id")
if err := s.storage.DeleteSnapshot(who.User, id); err != nil {
writeJSONError(w, http.StatusBadRequest, err.Error())
return
}
w.WriteHeader(http.StatusNoContent)
}
// GET /v1/quota
func (s *Server) handleGetQuota(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
used, err := s.storage.UsageBytes(who.User)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("usage: %v", err))
return
}
limit, err := s.quota.Quota(r.Context(), who.User)
if err != nil {
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("quota: %v", err))
return
}
list, _ := s.storage.ListSnapshots(who.User)
writeJSON(w, http.StatusOK, map[string]any{
"used_bytes": used,
"limit_bytes": limit,
"snapshots": len(list),
})
}
// writeJSON serializes v as JSON with the right Content-Type. Errors logged,
// not returned — by the time we're encoding the response, we've committed.
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(v); err != nil {
slog.Warn("encode response", "err", err)
}
}
func writeJSONError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]string{"error": msg})
}
// secondsSinceEpoch is exported for handlers/tests that need stable timestamp
// strings without round-tripping through time.Time.Format.
func secondsSinceEpoch() string { return strconv.FormatInt(time.Now().Unix(), 10) }