From f823c05aa369be23a6a5ff6b2ab2ef765d65f6e8 Mon Sep 17 00:00:00 2001 From: claude-timemachine Date: Wed, 10 Jun 2026 18:01:04 +0200 Subject: [PATCH] =?UTF-8?q?initial:=20svc-proxy=20=E2=80=94=20UDP=20valve?= =?UTF-8?q?=20for=20Simple=20Voice=20Chat?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Standalone Go service that routes SVC client traffic to per-server backend voice endpoints, configured via pg LISTEN/NOTIFY (same channel mc-router subscribes to). Each pg `servers` row with both `voice_address` and `voice_proxy_port` set spawns a Valve: a public UDP listener that maintains per-client ephemeral bridges to the backend's SVC port. Pieces: cmd/svc-proxy/main.go entry; wires config, log fan-out, bridge.Manager, pgsync, httpsrv internal/config/ DATABASE_URL + BIND_HOST + BRIDGE_IDLE_TTL (default 1m) + HTTP_ADDR (default :8081) internal/pgsync/ LISTEN automc_routes_changed; diff desired/actual routes; emit Apply() internal/bridge/ Valve per public port; per-client bridge with atomic up/down byte counters; idle eviction every 15s against TTL internal/httpsrv/ operator UI — embedded single-page HTML with active-connections table polled every 1s + SSE log stream (last 500 lines backlog on connect) Reverse-proxied behind server-manager at /infra/svc-proxy/* — bind internal-only addresses for production; auth is the dashboard's Basic gate. Co-Authored-By: Claude Opus 4.7 --- .gitea/workflows/ci.yml | 46 +++++ .gitignore | 4 + Dockerfile | 11 ++ README.md | 90 ++++++++++ cmd/svc-proxy/main.go | 65 +++++++ go.mod | 11 ++ go.sum | 26 +++ internal/bridge/manager.go | 263 +++++++++++++++++++++++++++++ internal/bridge/stats.go | 65 +++++++ internal/config/config.go | 48 ++++++ internal/httpsrv/logs.go | 190 +++++++++++++++++++++ internal/httpsrv/server.go | 78 +++++++++ internal/httpsrv/static/index.html | 201 ++++++++++++++++++++++ internal/pgsync/pgsync.go | 142 ++++++++++++++++ 14 files changed, 1240 insertions(+) create mode 100644 .gitea/workflows/ci.yml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 cmd/svc-proxy/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/bridge/manager.go create mode 100644 internal/bridge/stats.go create mode 100644 internal/config/config.go create mode 100644 internal/httpsrv/logs.go create mode 100644 internal/httpsrv/server.go create mode 100644 internal/httpsrv/static/index.html create mode 100644 internal/pgsync/pgsync.go diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml new file mode 100644 index 0000000..04e8e6d --- /dev/null +++ b/.gitea/workflows/ci.yml @@ -0,0 +1,46 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + +jobs: + validate: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: '1.25' + - name: Build + run: go build ./... + - name: Vet + run: go vet ./... + - name: Test + run: go test -race ./... + + docker: + runs-on: ubuntu-latest + needs: validate + if: github.event_name == 'push' + steps: + - uses: actions/checkout@v4 + - name: Compute tags + id: tags + run: | + echo "image=git.timemachine.center/timemachine/svc-proxy" >> "$GITHUB_OUTPUT" + - name: Login to registry + uses: docker/login-action@v3 + with: + registry: git.timemachine.center + username: ${{ secrets.REGISTRY_USER }} + password: ${{ secrets.REGISTRY_TOKEN }} + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + push: true + tags: | + ${{ steps.tags.outputs.image }}:latest + ${{ steps.tags.outputs.image }}:${{ github.sha }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b836055 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/svc-proxy +*.test +/coverage.out +.DS_Store diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4a40a29 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.25-alpine AS build +WORKDIR /app +COPY go.mod go.sum* ./ +RUN go mod download +COPY cmd ./cmd +COPY internal ./internal +RUN CGO_ENABLED=0 go build -o /svc-proxy ./cmd/svc-proxy + +FROM alpine:3.21 +COPY --from=build /svc-proxy /svc-proxy +ENTRYPOINT ["/svc-proxy"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..5f68e5a --- /dev/null +++ b/README.md @@ -0,0 +1,90 @@ +# svc-proxy + +Standalone UDP "valve" for [Simple Voice Chat](https://github.com/henkelmax/simple-voice-chat). Per-server public UDP port → backend voice address. Routes read from Postgres via `LISTEN`/`NOTIFY`, same pattern as `mc-router`. + +## What it does + +Each MC server in the automc fleet runs SVC on its own UDP port inside its container (default 24454). svc-proxy exposes a **public** UDP port per server and bridges client traffic to the backend. SVC's own `SecretPacket` is configured per backend to advertise the public proxy hostname + the assigned proxy port, so the client connects directly to the proxy — no MITM, no plugin-channel sniffing. + +``` +SVC client ──UDP──► svc-proxy.timemachine.center:24455 + │ + ├── (per-server valve) + │ + └──UDP──► mc-gtnh:24454 (backend SVC) +``` + +The proxy is **opaque** to the SVC payload — it can read the cleartext outer header (magic byte + player UUID) but the AES-GCM body stays end-to-end. Source-address bridges (one ephemeral upstream socket per client `SocketAddress`) survive NAT rebinds within the idle TTL. + +## pg schema + +Two new columns on `servers`: + +| Column | Type | Meaning | +|---|---|---| +| `voice_address` | text | Backend SVC address — `:` reachable from svc-proxy | +| `voice_proxy_port` | int | Public UDP port svc-proxy binds for this server | + +Rows with both NULL are ignored. Owner of allocation: **server-manager** (assigns the next free port from a configured pool when the server is provisioned; clears on delete). + +## NOTIFY channel + +Reuses `automc_routes_changed` from `mc-router`. The trigger on `servers` already fires on UPDATE, so adding/clearing the voice columns refreshes svc-proxy's bindings without restart. + +## Environment + +| Env | Default | Effect | +|---|---|---| +| `DATABASE_URL` | (required) | pgx DSN | +| `BIND_HOST` | `0.0.0.0` | host for the per-server UDP listeners | +| `BRIDGE_IDLE_TTL` | `5m` | tear down per-client upstream sockets after this much silence | +| `LOG_LEVEL` | `info` | `debug` / `info` / `warn` | + +## Operator UX + +```bash +# Allocate voice ports for an existing server (server-manager does this normally) +UPDATE servers + SET voice_address = 'mc-gtnh:24454', + voice_proxy_port = 24455 + WHERE name = 'gtnh'; +NOTIFY automc_routes_changed; +``` + +svc-proxy logs `valve open: :24455 → mc-gtnh:24454 (gtnh)` and is ready. + +To retire a server's voice routing: + +```bash +UPDATE servers SET voice_address = NULL, voice_proxy_port = NULL WHERE name = 'gtnh'; +NOTIFY automc_routes_changed; +``` + +svc-proxy logs `valve close: :24455 (gtnh)`. In-flight bridges are torn down. + +## Backend-side configuration + +The SVC plugin on the backend must advertise the **public** proxy address to clients (not the backend's own LAN address). Set in the backend's SVC config (`config/voicechat-server.properties`): + +```properties +voice_host=svc-proxy.timemachine.center:24455 +``` + +…or via env if mc-wrapper templates it. SVC bakes this into `SecretPacket.voiceHost`, the client uses it verbatim. + +## Why not the SVC bundled proxy + +SVC ships proxy support for BungeeCord/Velocity (`common-proxy` module). It sniffs the MC `voicechat:secret` plugin message and rewrites the host on the fly, then NAT-bridges UDP. That requires the SVC proxy to live inside the MC proxy process. We run mc-router (Go) instead of a Java MC proxy on the edge, so the bundled approach doesn't apply. + +svc-proxy is the equivalent for the mc-router shape: pure UDP data plane, pg-driven config, no plugin hooks. + +## Limitations + +- No replay protection at the proxy layer (SVC's AES-GCM is the only freshness guarantee — same as upstream). +- No client rate-limiting (SVC's plugin-channel rate limit covers TCP setup; UDP audio relies on Opus payload caps + the wrapper's `BRIDGE_IDLE_TTL` to bound per-source sockets). +- Bridge ephemeral upstream sockets aren't pooled — one syscall per concurrent client. Fine up to a few thousand concurrent voice users on a single proxy host. + +## Related + +- [mc-router (Timemachine fork)](https://git.timemachine.center/Timemachine/mc-router) — same NOTIFY channel, same pg-driven route source. +- [Simple Voice Chat](https://github.com/henkelmax/simple-voice-chat) — upstream mod whose wire protocol we pass through. diff --git a/cmd/svc-proxy/main.go b/cmd/svc-proxy/main.go new file mode 100644 index 0000000..432fc42 --- /dev/null +++ b/cmd/svc-proxy/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" + + "git.timemachine.center/timemachine/svc-proxy/internal/bridge" + "git.timemachine.center/timemachine/svc-proxy/internal/config" + "git.timemachine.center/timemachine/svc-proxy/internal/httpsrv" + "git.timemachine.center/timemachine/svc-proxy/internal/pgsync" +) + +func main() { + cfg, err := config.Load() + if err != nil { + slog.Error("config", "err", err) + os.Exit(2) + } + + bus := httpsrv.NewLogBus(500) + setupLogger(cfg.LogLevel, bus) + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + mgr := bridge.NewManager(ctx, cfg.BindHost, cfg.BridgeIdleTTL) + defer mgr.Shutdown() + + sync := pgsync.New(cfg.DatabaseURL, mgr) + go sync.Run(ctx) + + httpServer := httpsrv.New(cfg.HTTPAddr, mgr, bus) + go func() { + if err := httpServer.Run(ctx); err != nil { + slog.Error("http server", "err", err) + } + }() + + slog.Info("svc-proxy started", + "bind_host", cfg.BindHost, + "idle_ttl", cfg.BridgeIdleTTL, + "http", cfg.HTTPAddr, + ) + <-ctx.Done() + slog.Info("svc-proxy shutting down") +} + +func setupLogger(s string, bus *httpsrv.LogBus) { + var lvl slog.Level + switch s { + case "debug": + lvl = slog.LevelDebug + case "warn": + lvl = slog.LevelWarn + case "error": + lvl = slog.LevelError + default: + lvl = slog.LevelInfo + } + base := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: lvl}) + slog.SetDefault(slog.New(httpsrv.NewBusHandler(base, bus))) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9a2a5a1 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module git.timemachine.center/timemachine/svc-proxy + +go 1.25 + +require github.com/jackc/pgx/v5 v5.8.0 + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..87a6c8a --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/bridge/manager.go b/internal/bridge/manager.go new file mode 100644 index 0000000..a22d5ae --- /dev/null +++ b/internal/bridge/manager.go @@ -0,0 +1,263 @@ +// Package bridge owns the UDP data plane: one Valve per backend, each Valve +// owns a public listener socket and a pool of per-client bridges that copy +// datagrams to the backend's voice address and back. +package bridge + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "sync" + "time" + + "git.timemachine.center/timemachine/svc-proxy/internal/pgsync" +) + +// Manager is the top-level coordinator. Implements pgsync.Applier so the +// pgsync goroutine can hand it desired/undesired routes; Manager turns those +// into open/close calls on a Valve registry keyed by public port. +type Manager struct { + ctx context.Context + bindHost string + bridgeIdleTTL time.Duration + + mu sync.Mutex + valves map[int]*Valve // key: public UDP port +} + +func NewManager(ctx context.Context, bindHost string, idleTTL time.Duration) *Manager { + return &Manager{ + ctx: ctx, + bindHost: bindHost, + bridgeIdleTTL: idleTTL, + valves: map[int]*Valve{}, + } +} + +// Apply satisfies pgsync.Applier. Open first (so a backend-address change +// can flip-cleanly while the new listener takes over the new port), then +// close. +func (m *Manager) Apply(add []pgsync.Route, del []pgsync.Route) { + m.mu.Lock() + defer m.mu.Unlock() + + for _, r := range add { + if existing, ok := m.valves[r.Port]; ok { + // Same port, different backend — close, then re-open. + existing.Close() + delete(m.valves, r.Port) + } + v, err := openValve(m.ctx, m.bindHost, r, m.bridgeIdleTTL) + if err != nil { + slog.Error("valve open failed", "port", r.Port, "addr", r.Address, "name", r.Name, "err", err) + continue + } + m.valves[r.Port] = v + slog.Info("valve open", "port", r.Port, "addr", r.Address, "name", r.Name) + } + + for _, r := range del { + v, ok := m.valves[r.Port] + if !ok { + continue + } + v.Close() + delete(m.valves, r.Port) + slog.Info("valve close", "port", r.Port, "name", r.Name) + } +} + +// Shutdown closes every active valve. Safe to call once; idempotent for +// per-valve Close. +func (m *Manager) Shutdown() { + m.mu.Lock() + defer m.mu.Unlock() + for port, v := range m.valves { + v.Close() + delete(m.valves, port) + } +} + +// Valve owns one public UDP listener and the per-client bridges hanging off +// it. Each bridge is a goroutine that copies datagrams from one ephemeral +// upstream socket back to the original client. The public socket itself is +// the egress for backend → client. +type Valve struct { + route pgsync.Route + backend *net.UDPAddr + pub *net.UDPConn // 0.0.0.0: + + idleTTL time.Duration + + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + bridges map[string]*clientBridge // key: client.RemoteAddr().String() +} + +func openValve(parent context.Context, bindHost string, r pgsync.Route, idleTTL time.Duration) (*Valve, error) { + backend, err := net.ResolveUDPAddr("udp", r.Address) + if err != nil { + return nil, fmt.Errorf("resolve backend %q: %w", r.Address, err) + } + pubAddr := &net.UDPAddr{IP: net.ParseIP(bindHost), Port: r.Port} + if pubAddr.IP == nil { + return nil, fmt.Errorf("bind host %q not an IP", bindHost) + } + pub, err := net.ListenUDP("udp", pubAddr) + if err != nil { + return nil, fmt.Errorf("bind %s: %w", pubAddr, err) + } + ctx, cancel := context.WithCancel(parent) + v := &Valve{ + route: r, + backend: backend, + pub: pub, + idleTTL: idleTTL, + ctx: ctx, + cancel: cancel, + bridges: map[string]*clientBridge{}, + } + go v.readLoop() + go v.evictIdle() + return v, nil +} + +// readLoop runs forever copying packets from the public socket to per-client +// upstream sockets. The reverse direction (backend → client) is per-bridge +// goroutines on the upstream sockets writing back to v.pub. +func (v *Valve) readLoop() { + buf := make([]byte, 2048) // SVC max datagram body + for { + n, src, err := v.pub.ReadFromUDP(buf) + if err != nil { + if v.ctx.Err() != nil || errors.Is(err, net.ErrClosed) { + return + } + slog.Warn("valve read error", "port", v.route.Port, "err", err) + continue + } + v.mu.Lock() + b, ok := v.bridges[src.String()] + if !ok { + b, err = v.openBridge(src) + if err != nil { + v.mu.Unlock() + slog.Warn("bridge open failed", "port", v.route.Port, "src", src, "err", err) + continue + } + v.bridges[src.String()] = b + slog.Debug("bridge open", "port", v.route.Port, "client", src.String()) + } + v.mu.Unlock() + b.touch() + if _, err := b.upstream.Write(buf[:n]); err != nil { + if v.ctx.Err() == nil { + slog.Warn("bridge forward failed", "port", v.route.Port, "err", err) + } + continue + } + b.counters.bytesUp.Add(uint64(n)) + } +} + +func (v *Valve) openBridge(src *net.UDPAddr) (*clientBridge, error) { + up, err := net.DialUDP("udp", nil, v.backend) + if err != nil { + return nil, fmt.Errorf("dial backend: %w", err) + } + now := time.Now() + b := &clientBridge{ + client: src, + upstream: up, + valve: v, + openedAt: now, + } + b.lastSeen = now + go b.readBackend() + return b, nil +} + +func (v *Valve) evictIdle() { + t := time.NewTicker(15 * time.Second) + defer t.Stop() + for { + select { + case <-v.ctx.Done(): + return + case <-t.C: + cutoff := time.Now().Add(-v.idleTTL) + v.mu.Lock() + for k, b := range v.bridges { + if b.lastUseBefore(cutoff) { + slog.Debug("bridge idle evict", "port", v.route.Port, "client", k) + b.close() + delete(v.bridges, k) + } + } + v.mu.Unlock() + } + } +} + +func (v *Valve) Close() { + v.cancel() + v.pub.Close() + v.mu.Lock() + for k, b := range v.bridges { + b.close() + delete(v.bridges, k) + } + v.mu.Unlock() +} + +type clientBridge struct { + client *net.UDPAddr + upstream *net.UDPConn + valve *Valve + + counters counters // atomic — hot path + + mu sync.Mutex + lastSeen time.Time + openedAt time.Time +} + +func (b *clientBridge) touch() { + b.mu.Lock() + b.lastSeen = time.Now() + b.mu.Unlock() +} + +func (b *clientBridge) lastUseBefore(t time.Time) bool { + b.mu.Lock() + defer b.mu.Unlock() + return b.lastSeen.Before(t) +} + +func (b *clientBridge) close() { + _ = b.upstream.Close() +} + +// readBackend pumps datagrams from the backend back to the client via the +// public socket. Exits when the upstream socket is closed. +func (b *clientBridge) readBackend() { + buf := make([]byte, 2048) + for { + n, err := b.upstream.Read(buf) + if err != nil { + return + } + b.touch() + if _, err := b.valve.pub.WriteToUDP(buf[:n], b.client); err != nil { + if b.valve.ctx.Err() == nil { + slog.Warn("bridge reverse failed", "port", b.valve.route.Port, "err", err) + } + return + } + b.counters.bytesDown.Add(uint64(n)) + } +} diff --git a/internal/bridge/stats.go b/internal/bridge/stats.go new file mode 100644 index 0000000..ef6a888 --- /dev/null +++ b/internal/bridge/stats.go @@ -0,0 +1,65 @@ +package bridge + +import ( + "sync/atomic" + "time" +) + +// counters is the per-bridge byte tally. Updated from the two hot paths +// (readLoop client→backend, readBackend backend→client) — atomic to avoid +// locking the bridge for every datagram. +type counters struct { + bytesUp atomic.Uint64 // client → backend + bytesDown atomic.Uint64 // backend → client +} + +// ConnSnapshot is one row of the active-connections table the UI renders. +// All times are wall-clock; sizes are total bytes since the bridge opened. +type ConnSnapshot struct { + Server string `json:"server"` // pg row name (e.g. "gtnh") + Port int `json:"port"` // public UDP port (the valve) + Backend string `json:"backend"` // backend addr + Client string `json:"client"` // source IP:port + BytesUp uint64 `json:"bytes_up"` // client → backend + BytesDown uint64 `json:"bytes_down"` // backend → client + OpenedAt time.Time `json:"opened_at"` // bridge creation + LastSeen time.Time `json:"last_seen"` // most-recent datagram either direction + IdleSeconds float64 `json:"idle_seconds"` // derived; UI sorts by this +} + +// Snapshot returns one row per active client bridge across all valves. +// Cheap-ish: takes the Manager lock + each Valve lock briefly, no per-bridge +// lock (counters are atomic; LastSeen is read under the bridge lock). +func (m *Manager) Snapshot() []ConnSnapshot { + m.mu.Lock() + valves := make([]*Valve, 0, len(m.valves)) + for _, v := range m.valves { + valves = append(valves, v) + } + m.mu.Unlock() + + now := time.Now() + var out []ConnSnapshot + for _, v := range valves { + v.mu.Lock() + for _, b := range v.bridges { + b.mu.Lock() + lastSeen := b.lastSeen + opened := b.openedAt + b.mu.Unlock() + out = append(out, ConnSnapshot{ + Server: v.route.Name, + Port: v.route.Port, + Backend: v.route.Address, + Client: b.client.String(), + BytesUp: b.counters.bytesUp.Load(), + BytesDown: b.counters.bytesDown.Load(), + OpenedAt: opened, + LastSeen: lastSeen, + IdleSeconds: now.Sub(lastSeen).Seconds(), + }) + } + v.mu.Unlock() + } + return out +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..2fcc5a0 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,48 @@ +package config + +import ( + "fmt" + "os" + "time" +) + +type Config struct { + DatabaseURL string + BindHost string + BridgeIdleTTL time.Duration + HTTPAddr string + LogLevel string +} + +func Load() (Config, error) { + cfg := Config{ + DatabaseURL: os.Getenv("DATABASE_URL"), + BindHost: envOr("BIND_HOST", "0.0.0.0"), + BridgeIdleTTL: envDur("BRIDGE_IDLE_TTL", 1*time.Minute), + HTTPAddr: envOr("HTTP_ADDR", ":8081"), + LogLevel: envOr("LOG_LEVEL", "info"), + } + if cfg.DatabaseURL == "" { + return cfg, fmt.Errorf("DATABASE_URL required") + } + return cfg, nil +} + +func envOr(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func envDur(key string, fallback time.Duration) time.Duration { + v := os.Getenv(key) + if v == "" { + return fallback + } + d, err := time.ParseDuration(v) + if err != nil { + return fallback + } + return d +} diff --git a/internal/httpsrv/logs.go b/internal/httpsrv/logs.go new file mode 100644 index 0000000..0d7c778 --- /dev/null +++ b/internal/httpsrv/logs.go @@ -0,0 +1,190 @@ +package httpsrv + +import ( + "context" + "fmt" + "io" + "log/slog" + "net/http" + "sync" + "time" +) + +// LogBus is a fan-out buffer for log lines. It holds a ring of the last N +// entries and broadcasts new lines to live SSE subscribers. The slog Handler +// in NewLogBus writes each formatted record into both the underlying handler +// (stderr) AND this bus. +type LogBus struct { + cap int + + mu sync.RWMutex + ring []LogEntry + next int + full bool + listeners map[chan LogEntry]struct{} +} + +type LogEntry struct { + Time time.Time `json:"time"` + Level string `json:"level"` + Msg string `json:"msg"` + Attrs string `json:"attrs,omitempty"` +} + +func NewLogBus(capacity int) *LogBus { + if capacity <= 0 { + capacity = 500 + } + return &LogBus{ + cap: capacity, + ring: make([]LogEntry, capacity), + listeners: map[chan LogEntry]struct{}{}, + } +} + +func (b *LogBus) push(e LogEntry) { + b.mu.Lock() + b.ring[b.next] = e + b.next = (b.next + 1) % b.cap + if b.next == 0 { + b.full = true + } + subs := make([]chan LogEntry, 0, len(b.listeners)) + for ch := range b.listeners { + subs = append(subs, ch) + } + b.mu.Unlock() + for _, ch := range subs { + select { + case ch <- e: + default: + // slow subscriber; drop rather than back-pressure the writer + } + } +} + +// Backlog returns the buffered entries oldest-first. +func (b *LogBus) Backlog() []LogEntry { + b.mu.RLock() + defer b.mu.RUnlock() + if !b.full { + out := make([]LogEntry, b.next) + copy(out, b.ring[:b.next]) + return out + } + out := make([]LogEntry, 0, b.cap) + out = append(out, b.ring[b.next:]...) + out = append(out, b.ring[:b.next]...) + return out +} + +// Subscribe registers a fresh channel that will receive every subsequent +// entry. Caller must call Unsubscribe when done. +func (b *LogBus) Subscribe() chan LogEntry { + ch := make(chan LogEntry, 32) + b.mu.Lock() + b.listeners[ch] = struct{}{} + b.mu.Unlock() + return ch +} + +func (b *LogBus) Unsubscribe(ch chan LogEntry) { + b.mu.Lock() + delete(b.listeners, ch) + b.mu.Unlock() + close(ch) +} + +// busHandler wraps a base slog.Handler and pushes a structured copy of each +// record to the LogBus. Errors during push are ignored — logging must never +// stall on UI subscribers. +type busHandler struct { + base slog.Handler + bus *LogBus +} + +// NewBusHandler returns a slog.Handler that emits to both `base` and `bus`. +func NewBusHandler(base slog.Handler, bus *LogBus) slog.Handler { + return &busHandler{base: base, bus: bus} +} + +func (h *busHandler) Enabled(ctx context.Context, lvl slog.Level) bool { + return h.base.Enabled(ctx, lvl) +} + +func (h *busHandler) Handle(ctx context.Context, r slog.Record) error { + // First emit to the base handler so console/journald behaviour is + // preserved. Push to bus regardless of base error. + err := h.base.Handle(ctx, r) + + var attrs string + r.Attrs(func(a slog.Attr) bool { + if attrs != "" { + attrs += " " + } + attrs += fmt.Sprintf("%s=%v", a.Key, a.Value.Any()) + return true + }) + h.bus.push(LogEntry{ + Time: r.Time, + Level: r.Level.String(), + Msg: r.Message, + Attrs: attrs, + }) + return err +} + +func (h *busHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return &busHandler{base: h.base.WithAttrs(attrs), bus: h.bus} +} + +func (h *busHandler) WithGroup(name string) slog.Handler { + return &busHandler{base: h.base.WithGroup(name), bus: h.bus} +} + +// sseLogs streams the backlog + every new entry as Server-Sent Events. +// Each event is one JSON-encoded LogEntry on a `data:` line. +func sseLogs(bus *LogBus) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // disable proxy buffering + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + for _, e := range bus.Backlog() { + writeEvent(w, e) + } + flusher.Flush() + + ch := bus.Subscribe() + defer bus.Unsubscribe(ch) + + // Heartbeat keeps proxies from closing the conn during silent periods. + heartbeat := time.NewTicker(30 * time.Second) + defer heartbeat.Stop() + + for { + select { + case <-r.Context().Done(): + return + case e := <-ch: + writeEvent(w, e) + flusher.Flush() + case <-heartbeat.C: + _, _ = io.WriteString(w, ":hb\n\n") + flusher.Flush() + } + } + } +} + +func writeEvent(w io.Writer, e LogEntry) { + fmt.Fprintf(w, "data: {\"time\":%q,\"level\":%q,\"msg\":%q,\"attrs\":%q}\n\n", + e.Time.Format(time.RFC3339Nano), e.Level, e.Msg, e.Attrs) +} diff --git a/internal/httpsrv/server.go b/internal/httpsrv/server.go new file mode 100644 index 0000000..3df9690 --- /dev/null +++ b/internal/httpsrv/server.go @@ -0,0 +1,78 @@ +// Package httpsrv exposes the svc-proxy operator UI + JSON API. Designed to +// be reverse-proxied behind server-manager (no auth/TLS at this layer; the +// listener should bind to the container network only). +package httpsrv + +import ( + "context" + "embed" + "encoding/json" + "errors" + "io/fs" + "log/slog" + "net/http" + "sort" + "time" + + "git.timemachine.center/timemachine/svc-proxy/internal/bridge" +) + +//go:embed static/* +var staticFS embed.FS + +type Server struct { + addr string + mgr *bridge.Manager + bus *LogBus + srv *http.Server +} + +func New(addr string, mgr *bridge.Manager, bus *LogBus) *Server { + mux := http.NewServeMux() + s := &Server{ + addr: addr, + mgr: mgr, + bus: bus, + } + + sub, err := fs.Sub(staticFS, "static") + if err != nil { + panic(err) // embed.FS misconfigured at build time + } + mux.Handle("GET /", http.FileServer(http.FS(sub))) + mux.HandleFunc("GET /api/connections", s.handleConnections) + mux.HandleFunc("GET /api/logs", sseLogs(bus)) + + s.srv = &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + return s +} + +// Run blocks until ctx is cancelled or the server errors. +func (s *Server) Run(ctx context.Context) error { + go func() { + <-ctx.Done() + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = s.srv.Shutdown(shutCtx) + }() + slog.Info("http server listening", "addr", s.addr) + if err := s.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil +} + +func (s *Server) handleConnections(w http.ResponseWriter, _ *http.Request) { + snap := s.mgr.Snapshot() + // Sort by most-recently-active first so the UI can render top-down. + sort.Slice(snap, func(i, j int) bool { return snap[i].LastSeen.After(snap[j].LastSeen) }) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "connections": snap, + "at": time.Now(), + }) +} diff --git a/internal/httpsrv/static/index.html b/internal/httpsrv/static/index.html new file mode 100644 index 0000000..a456934 --- /dev/null +++ b/internal/httpsrv/static/index.html @@ -0,0 +1,201 @@ + + + + + svc-proxy + + + +
+

svc-proxy — connecting…

+ log stream: connecting +
+ +
+

Active connections

+ + + + + + + + + + + + + + +
ServerPortClientBackendUpDownIdleAge
+
no active bridges
+
+ +
+

+ Logs + +

+

+  
+ + + + diff --git a/internal/pgsync/pgsync.go b/internal/pgsync/pgsync.go new file mode 100644 index 0000000..fc6d6da --- /dev/null +++ b/internal/pgsync/pgsync.go @@ -0,0 +1,142 @@ +// Package pgsync mirrors the LISTEN/NOTIFY route-source pattern from +// Timemachine/mc-router's internal/automc. Watches the `servers` table for +// rows that have both voice columns set and emits Route events whenever the +// desired set changes. +package pgsync + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/jackc/pgx/v5" +) + +const ( + NotifyChannel = "automc_routes_changed" + reconnectMin = 1 * time.Second + reconnectMax = 30 * time.Second +) + +// Route is a single voice routing row from postgres. Both Port and Address +// are guaranteed non-zero when emitted via Apply. +type Route struct { + Name string // human-readable; logged on open/close + Port int // public UDP port svc-proxy binds + Address string // backend voice host:port (e.g. "mc-gtnh:24454") +} + +// Applier reconciles a desired route set against currently-bound valves. +// Add is called for routes that are new or whose backend address changed; +// Del is called for routes that disappeared or whose backend address changed +// (in pair with the new Add for the same port). +type Applier interface { + Apply(add []Route, del []Route) +} + +type Syncer struct { + dsn string + applier Applier + current map[string]Route // key: server name +} + +func New(dsn string, a Applier) *Syncer { + return &Syncer{dsn: dsn, applier: a, current: map[string]Route{}} +} + +// Run blocks until ctx is cancelled. Reconnects on error with exponential +// backoff capped at reconnectMax. +func (s *Syncer) Run(ctx context.Context) { + backoff := reconnectMin + for { + if ctx.Err() != nil { + return + } + err := s.connectAndLoop(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + slog.Warn("pgsync disconnected", "err", err, "retry_in", backoff) + } + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + } + backoff *= 2 + if backoff > reconnectMax { + backoff = reconnectMax + } + } +} + +func (s *Syncer) connectAndLoop(ctx context.Context) error { + conn, err := pgx.Connect(ctx, s.dsn) + if err != nil { + return fmt.Errorf("pgx connect: %w", err) + } + defer conn.Close(context.Background()) + + if _, err := conn.Exec(ctx, "LISTEN "+NotifyChannel); err != nil { + return fmt.Errorf("LISTEN: %w", err) + } + slog.Info("pgsync connected", "channel", NotifyChannel) + + if err := s.refresh(ctx, conn); err != nil { + return fmt.Errorf("initial refresh: %w", err) + } + + for { + if _, err := conn.WaitForNotification(ctx); err != nil { + return fmt.Errorf("wait notification: %w", err) + } + if err := s.refresh(ctx, conn); err != nil { + return fmt.Errorf("refresh: %w", err) + } + } +} + +func (s *Syncer) refresh(ctx context.Context, conn *pgx.Conn) error { + rows, err := conn.Query(ctx, ` + SELECT name, voice_proxy_port, voice_address + FROM servers + WHERE enabled IS NOT FALSE + AND voice_proxy_port IS NOT NULL + AND voice_address IS NOT NULL + AND voice_address != ''`) + if err != nil { + return err + } + defer rows.Close() + + desired := map[string]Route{} + for rows.Next() { + var r Route + if err := rows.Scan(&r.Name, &r.Port, &r.Address); err != nil { + return err + } + desired[r.Name] = r + } + if err := rows.Err(); err != nil { + return err + } + + add, del := diff(s.current, desired) + s.applier.Apply(add, del) + s.current = desired + return nil +} + +func diff(prev, next map[string]Route) (add []Route, del []Route) { + for name, r := range next { + if p, ok := prev[name]; !ok || p.Port != r.Port || p.Address != r.Address { + add = append(add, r) + } + } + for name, r := range prev { + if n, ok := next[name]; !ok || n.Port != r.Port || n.Address != r.Address { + del = append(del, r) + } + } + return add, del +}