initial: Steam-Cloud-style per-user state sync skeleton
HTTP API + on-disk storage + auth-service token verification + dev mode. 31 tests pass, vet clean. See DESIGN.md for the architecture and README.md for the operator surface. Pending: pg-backed per-user quota override, snapshot retention / blob GC, tarball-vs-manifest content cross-check, end-to-end deploy on john.
This commit is contained in:
@@ -0,0 +1,265 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// QuotaProvider returns a per-user byte quota. Stub returns a default for
|
||||
// every user; a future pg-backed impl reads users.cloud_quota_bytes.
|
||||
type QuotaProvider interface {
|
||||
Quota(ctx context.Context, user string) (int64, error)
|
||||
}
|
||||
|
||||
type DefaultQuota int64
|
||||
|
||||
func (q DefaultQuota) Quota(_ context.Context, _ string) (int64, error) {
|
||||
return int64(q), nil
|
||||
}
|
||||
|
||||
// Server holds wiring. Routes registered in NewServer.
|
||||
type Server struct {
|
||||
storage *Storage
|
||||
verifier Verifier
|
||||
quota QuotaProvider
|
||||
mux *http.ServeMux
|
||||
}
|
||||
|
||||
func NewServer(storage *Storage, verifier Verifier, quota QuotaProvider) *Server {
|
||||
s := &Server{storage: storage, verifier: verifier, quota: quota, mux: http.NewServeMux()}
|
||||
s.routes()
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
s.mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (s *Server) routes() {
|
||||
s.mux.HandleFunc("GET /v1/manifest", s.requireAuth(s.handleGetManifest))
|
||||
s.mux.HandleFunc("GET /v1/blob/{sha}", s.requireAuth(s.handleGetBlob))
|
||||
s.mux.HandleFunc("POST /v1/snapshot", s.requireAuth(s.handlePostSnapshot))
|
||||
s.mux.HandleFunc("GET /v1/snapshots", s.requireAuth(s.handleListSnapshots))
|
||||
s.mux.HandleFunc("GET /v1/snapshot/{id}", s.requireAuth(s.handleGetSnapshot))
|
||||
s.mux.HandleFunc("DELETE /v1/snapshot/{id}", s.requireAuth(s.handleDeleteSnapshot))
|
||||
s.mux.HandleFunc("GET /v1/quota", s.requireAuth(s.handleGetQuota))
|
||||
s.mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("ok"))
|
||||
})
|
||||
}
|
||||
|
||||
// authedHandler is a handler that gets the verified caller info.
|
||||
type authedHandler func(w http.ResponseWriter, r *http.Request, who *AuthInfo)
|
||||
|
||||
func (s *Server) requireAuth(h authedHandler) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
token := extractBearer(r)
|
||||
if token == "" {
|
||||
writeJSONError(w, http.StatusUnauthorized, "missing bearer token")
|
||||
return
|
||||
}
|
||||
info, err := s.verifier.Verify(r.Context(), token)
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, ErrUnauthenticated):
|
||||
writeJSONError(w, http.StatusUnauthorized, "invalid token")
|
||||
case errors.Is(err, ErrRevoked):
|
||||
writeJSONError(w, http.StatusForbidden, "token revoked")
|
||||
default:
|
||||
slog.Warn("auth verify failed", "err", err)
|
||||
writeJSONError(w, http.StatusBadGateway, "auth backend unavailable")
|
||||
}
|
||||
return
|
||||
}
|
||||
if !info.HasScope("cloud:rw") {
|
||||
writeJSONError(w, http.StatusForbidden, "token lacks cloud:rw scope")
|
||||
return
|
||||
}
|
||||
h(w, r, info)
|
||||
}
|
||||
}
|
||||
|
||||
// GET /v1/manifest
|
||||
func (s *Server) handleGetManifest(w http.ResponseWriter, _ *http.Request, who *AuthInfo) {
|
||||
m, err := s.storage.ReadManifest(who.User)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("read manifest: %v", err))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, m)
|
||||
}
|
||||
|
||||
// GET /v1/blob/{sha}
|
||||
func (s *Server) handleGetBlob(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
|
||||
sha := r.PathValue("sha")
|
||||
rc, err := s.storage.ReadBlob(who.User, sha)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
writeJSONError(w, http.StatusNotFound, "blob not found")
|
||||
return
|
||||
}
|
||||
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("read blob: %v", err))
|
||||
return
|
||||
}
|
||||
defer rc.Close()
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
_, _ = io.Copy(w, rc)
|
||||
}
|
||||
|
||||
// POST /v1/snapshot
|
||||
// Body: multipart/form-data with parts:
|
||||
// manifest — application/json (the Manifest)
|
||||
// tarball — application/octet-stream (the .tar.zst payload)
|
||||
// The handler does NOT enforce that the tarball matches the manifest's
|
||||
// content list — that's a v2 concern. For v1 the manifest is what governs.
|
||||
func (s *Server) handlePostSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
|
||||
// 50 MB in-memory cap; larger streams to temp files. Tunable.
|
||||
if err := r.ParseMultipartForm(50 << 20); err != nil {
|
||||
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("parse multipart: %v", err))
|
||||
return
|
||||
}
|
||||
manifestFile, _, err := r.FormFile("manifest")
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusBadRequest, "missing manifest part")
|
||||
return
|
||||
}
|
||||
defer manifestFile.Close()
|
||||
m, err := ReadManifest(manifestFile)
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("invalid manifest: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
tarFile, _, err := r.FormFile("tarball")
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusBadRequest, "missing tarball part")
|
||||
return
|
||||
}
|
||||
defer tarFile.Close()
|
||||
tarBytes, err := io.ReadAll(tarFile)
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("read tarball: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Quota check: estimate new usage = current + tarball + small overhead.
|
||||
used, err := s.storage.UsageBytes(who.User)
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("usage: %v", err))
|
||||
return
|
||||
}
|
||||
limit, err := s.quota.Quota(r.Context(), who.User)
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("quota: %v", err))
|
||||
return
|
||||
}
|
||||
if used+int64(len(tarBytes)) > limit {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusRequestEntityTooLarge)
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"error": "quota",
|
||||
"used": used,
|
||||
"limit": limit,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.storage.StoreSnapshot(who.User, m, tarBytes); err != nil {
|
||||
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("store: %v", err))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]string{
|
||||
"snapshot_id": m.SnapshotID,
|
||||
"snapshot_url": "/v1/snapshot/" + m.SnapshotID,
|
||||
})
|
||||
}
|
||||
|
||||
// GET /v1/snapshots
|
||||
func (s *Server) handleListSnapshots(w http.ResponseWriter, _ *http.Request, who *AuthInfo) {
|
||||
list, err := s.storage.ListSnapshots(who.User)
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("list: %v", err))
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"snapshots": list})
|
||||
}
|
||||
|
||||
// GET /v1/snapshot/{id}
|
||||
func (s *Server) handleGetSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
|
||||
id := r.PathValue("id")
|
||||
rc, err := s.storage.OpenSnapshot(who.User, id)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
writeJSONError(w, http.StatusNotFound, "snapshot not found")
|
||||
return
|
||||
}
|
||||
writeJSONError(w, http.StatusBadRequest, fmt.Sprintf("open snapshot: %v", err))
|
||||
return
|
||||
}
|
||||
defer rc.Close()
|
||||
w.Header().Set("Content-Type", "application/zstd")
|
||||
w.Header().Set("Content-Disposition",
|
||||
fmt.Sprintf(`attachment; filename="%s.tar.zst"`, id))
|
||||
_, _ = io.Copy(w, rc)
|
||||
}
|
||||
|
||||
// DELETE /v1/snapshot/{id}
|
||||
func (s *Server) handleDeleteSnapshot(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
|
||||
id := r.PathValue("id")
|
||||
if err := s.storage.DeleteSnapshot(who.User, id); err != nil {
|
||||
writeJSONError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// GET /v1/quota
|
||||
func (s *Server) handleGetQuota(w http.ResponseWriter, r *http.Request, who *AuthInfo) {
|
||||
used, err := s.storage.UsageBytes(who.User)
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("usage: %v", err))
|
||||
return
|
||||
}
|
||||
limit, err := s.quota.Quota(r.Context(), who.User)
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusInternalServerError, fmt.Sprintf("quota: %v", err))
|
||||
return
|
||||
}
|
||||
list, _ := s.storage.ListSnapshots(who.User)
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"used_bytes": used,
|
||||
"limit_bytes": limit,
|
||||
"snapshots": len(list),
|
||||
})
|
||||
}
|
||||
|
||||
// writeJSON serializes v as JSON with the right Content-Type. Errors logged,
|
||||
// not returned — by the time we're encoding the response, we've committed.
|
||||
func writeJSON(w http.ResponseWriter, status int, v any) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
if err := json.NewEncoder(w).Encode(v); err != nil {
|
||||
slog.Warn("encode response", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func writeJSONError(w http.ResponseWriter, status int, msg string) {
|
||||
writeJSON(w, status, map[string]string{"error": msg})
|
||||
}
|
||||
|
||||
// secondsSinceEpoch is exported for handlers/tests that need stable timestamp
|
||||
// strings without round-tripping through time.Time.Format.
|
||||
func secondsSinceEpoch() string { return strconv.FormatInt(time.Now().Unix(), 10) }
|
||||
Reference in New Issue
Block a user