package main import ( "archive/tar" "bytes" "context" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "os" "strconv" "time" ) // QuotaProvider returns a per-user byte quota. Stub returns a default for // every user; a future pg-backed impl reads users.cloud_quota_bytes. type QuotaProvider interface { Quota(ctx context.Context, user string) (int64, error) } type DefaultQuota int64 func (q DefaultQuota) Quota(_ context.Context, _ string) (int64, error) { return int64(q), nil } // Server holds wiring. Routes registered in NewServer. type Server struct { storage *Storage verifier Verifier quota QuotaProvider mux *http.ServeMux } func NewServer(storage *Storage, verifier Verifier, quota QuotaProvider) *Server { s := &Server{storage: storage, verifier: verifier, quota: quota, mux: http.NewServeMux()} s.routes() return s } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.mux.ServeHTTP(w, r) } func (s *Server) routes() { s.mux.HandleFunc("GET /v1/manifest", s.requireAuth(s.handleGetManifest)) s.mux.HandleFunc("GET /v1/blob/{sha}", s.requireAuth(s.handleGetBlob)) s.mux.HandleFunc("POST /v1/snapshot", s.requireAuth(s.handlePostSnapshot)) s.mux.HandleFunc("GET /v1/snapshots", s.requireAuth(s.handleListSnapshots)) s.mux.HandleFunc("GET /v1/snapshot/{id}", s.requireAuth(s.handleGetSnapshot)) s.mux.HandleFunc("DELETE /v1/snapshot/{id}", s.requireAuth(s.handleDeleteSnapshot)) s.mux.HandleFunc("GET /v1/quota", s.requireAuth(s.handleGetQuota)) s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) } // authedHandler is a handler that gets the verified caller info. type authedHandler func(w http.ResponseWriter, r *http.Request, who *AuthInfo) func (s *Server) requireAuth(h authedHandler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { token := extractBearer(r) if token == "" { writeJSONError(w, http.StatusUnauthorized, "missing bearer token") return } info, err := s.verifier.Verify(r.Context(), token) if err != nil { switch { case errors.Is(err, ErrUnauthenticated): writeJSONError(w, http.StatusUnauthorized, "invalid token") case errors.Is(err, ErrRevoked): writeJSONError(w, http.StatusForbidden, "token revoked") default: slog.Warn("auth verify failed", "err", err) writeJSONError(w, http.StatusBadGateway, "auth backend unavailable") } return } if !info.HasScope("cloud:rw") { writeJSONError(w, http.StatusForbidden, "token lacks cloud:rw scope") return } h(w, r, info) } } // GET /v1/manifest func (s *Server) handleGetManifest(w http.ResponseWriter, _ *http.Request, who *AuthInfo) { m, err := s.storage.ReadManifest(who.User) if err != nil { if errors.Is(err, os.ErrNotExist) { w.WriteHeader(http.StatusNoContent) return } writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("read manifest: %v", err)) return } writeJSON(w, http.StatusOK, m) } // GET /v1/blob/{sha} func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request, who *AuthInfo) { sha := r.PathValue("sha") rc, err := s.storage.ReadBlob(who.User, sha) if err != nil { if errors.Is(err, os.ErrNotExist) { writeJSONError(w, http.StatusNotFound, "blob not found") return } writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("read blob: %v", err)) return } defer rc.Close() w.Header().Set("Content-Type", "application/octet-stream") _, _ = io.Copy(w, rc) } // POST /v1/snapshot // Body: multipart/form-data with parts: // manifest — application/json (the Manifest) // tarball — application/octet-stream (the .tar.zst payload) // The handler does NOT enforce that the tarball matches the manifest's // content list — that's a v2 concern. For v1 the manifest is what governs. func (s *Server) handlePostSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) { // 50 MB in-memory cap; larger streams to temp files. Tunable. if err := r.ParseMultipartForm(50 << 20); err != nil { writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("parse multipart: %v", err)) return } manifestFile, _, err := r.FormFile("manifest") if err != nil { writeJSONError(w, http.StatusBadRequest, "missing manifest part") return } defer manifestFile.Close() m, err := ReadManifest(manifestFile) if err != nil { writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("invalid manifest: %v", err)) return } tarFile, _, err := r.FormFile("tarball") if err != nil { writeJSONError(w, http.StatusBadRequest, "missing tarball part") return } defer tarFile.Close() tarBytes, err := io.ReadAll(tarFile) if err != nil { writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("read tarball: %v", err)) return } // Quota check: estimate new usage = current + tarball + small overhead. used, err := s.storage.UsageBytes(who.User) if err != nil { writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("usage: %v", err)) return } limit, err := s.quota.Quota(r.Context(), who.User) if err != nil { writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("quota: %v", err)) return } if used+int64(len(tarBytes)) > limit { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusRequestEntityTooLarge) _ = json.NewEncoder(w).Encode(map[string]any{ "error": "quota", "used": used, "limit": limit, }) return } // Extract tarball into blob store + verify each entry's hash matches // the manifest. The tarball is uncompressed plain tar in v1; zstd // support deferred until Python stdlib gains a zstd module. if err := s.extractTarToBlobs(who.User, m, tarBytes); err != nil { writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("extract tarball: %v", err)) return } if err := s.storage.StoreSnapshot(who.User, m, tarBytes); err != nil { writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("store: %v", err)) return } writeJSON(w, http.StatusOK, map[string]string{ "snapshot_id": m.SnapshotID, "snapshot_url": "/v1/snapshot/" + m.SnapshotID, }) } // extractTarToBlobs walks the uploaded tarball, writes each regular file's // contents into the blob store (content-addressed dedupe), and verifies the // hash against the manifest entry for that path. Files in the tarball that // AREN'T declared in the manifest are ignored (defensive: never trust tar // contents). Manifest entries without a tarball file are allowed (the // blob may already exist from a previous snapshot). func (s *Server) extractTarToBlobs(user string, m *Manifest, tarBytes []byte) error { tr := tar.NewReader(bytes.NewReader(tarBytes)) for { hdr, err := tr.Next() if errors.Is(err, io.EOF) { break } if err != nil { return fmt.Errorf("tar read: %w", err) } if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA { continue } expected, ok := m.Files[hdr.Name] if !ok { // File in tar not declared in manifest. Skip; don't trust it. continue } buf := make([]byte, 0, hdr.Size) buf, err = io.ReadAll(tr) if err != nil { return fmt.Errorf("tar entry %s: %w", hdr.Name, err) } sha, err := s.storage.WriteBlob(user, buf) if err != nil { return fmt.Errorf("write blob for %s: %w", hdr.Name, err) } if sha != expected.SHA256 { return fmt.Errorf("manifest mismatch for %s: tar sha %s != manifest sha %s", hdr.Name, sha, expected.SHA256) } } return nil } // GET /v1/snapshots func (s *Server) handleListSnapshots(w http.ResponseWriter, _ *http.Request, who *AuthInfo) { list, err := s.storage.ListSnapshots(who.User) if err != nil { writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("list: %v", err)) return } writeJSON(w, http.StatusOK, map[string]any{"snapshots": list}) } // GET /v1/snapshot/{id} func (s *Server) handleGetSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) { id := r.PathValue("id") rc, err := s.storage.OpenSnapshot(who.User, id) if err != nil { if errors.Is(err, os.ErrNotExist) { writeJSONError(w, http.StatusNotFound, "snapshot not found") return } writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("open snapshot: %v", err)) return } defer rc.Close() w.Header().Set("Content-Type", "application/zstd") w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s.tar.zst"`, id)) _, _ = io.Copy(w, rc) } // DELETE /v1/snapshot/{id} func (s *Server) handleDeleteSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) { id := r.PathValue("id") if err := s.storage.DeleteSnapshot(who.User, id); err != nil { writeJSONError(w, http.StatusBadRequest, err.Error()) return } w.WriteHeader(http.StatusNoContent) } // GET /v1/quota func (s *Server) handleGetQuota(w http.ResponseWriter, r *http.Request, who *AuthInfo) { used, err := s.storage.UsageBytes(who.User) if err != nil { writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("usage: %v", err)) return } limit, err := s.quota.Quota(r.Context(), who.User) if err != nil { writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("quota: %v", err)) return } list, _ := s.storage.ListSnapshots(who.User) writeJSON(w, http.StatusOK, map[string]any{ "used_bytes": used, "limit_bytes": limit, "snapshots": len(list), }) } // writeJSON serializes v as JSON with the right Content-Type. Errors logged, // not returned — by the time we're encoding the response, we've committed. func writeJSON(w http.ResponseWriter, status int, v any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) if err := json.NewEncoder(w).Encode(v); err != nil { slog.Warn("encode response", "err", err) } } func writeJSONError(w http.ResponseWriter, status int, msg string) { writeJSON(w, status, map[string]string{"error": msg}) } // secondsSinceEpoch is exported for handlers/tests that need stable timestamp // strings without round-tripping through time.Time.Format. func secondsSinceEpoch() string { return strconv.FormatInt(time.Now().Unix(), 10) }