Files
claude-timemachine f823c05aa3
CI / validate (push) Successful in 24s
CI / docker (push) Failing after 1m49s
initial: svc-proxy — UDP valve for Simple Voice Chat
Standalone Go service that routes SVC client traffic to per-server
backend voice endpoints, configured via pg LISTEN/NOTIFY (same channel
mc-router subscribes to). Each pg `servers` row with both
`voice_address` and `voice_proxy_port` set spawns a Valve: a public
UDP listener that maintains per-client ephemeral bridges to the
backend's SVC port.

Pieces:
  cmd/svc-proxy/main.go     entry; wires config, log fan-out,
                            bridge.Manager, pgsync, httpsrv
  internal/config/          DATABASE_URL + BIND_HOST +
                            BRIDGE_IDLE_TTL (default 1m) +
                            HTTP_ADDR (default :8081)
  internal/pgsync/          LISTEN automc_routes_changed; diff
                            desired/actual routes; emit Apply()
  internal/bridge/          Valve per public port; per-client
                            bridge with atomic up/down byte counters;
                            idle eviction every 15s against TTL
  internal/httpsrv/         operator UI — embedded single-page HTML
                            with active-connections table polled
                            every 1s + SSE log stream
                            (last 500 lines backlog on connect)

Reverse-proxied behind server-manager at /infra/svc-proxy/* — bind
internal-only addresses for production; auth is the dashboard's
Basic gate.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-10 18:01:04 +02:00

191 lines
4.5 KiB
Go

package httpsrv
import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"
)
// LogBus is a fan-out buffer for log lines. It holds a ring of the last N
// entries and broadcasts new lines to live SSE subscribers. The slog Handler
// in NewLogBus writes each formatted record into both the underlying handler
// (stderr) AND this bus.
type LogBus struct {
cap int
mu sync.RWMutex
ring []LogEntry
next int
full bool
listeners map[chan LogEntry]struct{}
}
type LogEntry struct {
Time time.Time `json:"time"`
Level string `json:"level"`
Msg string `json:"msg"`
Attrs string `json:"attrs,omitempty"`
}
func NewLogBus(capacity int) *LogBus {
if capacity <= 0 {
capacity = 500
}
return &LogBus{
cap: capacity,
ring: make([]LogEntry, capacity),
listeners: map[chan LogEntry]struct{}{},
}
}
func (b *LogBus) push(e LogEntry) {
b.mu.Lock()
b.ring[b.next] = e
b.next = (b.next + 1) % b.cap
if b.next == 0 {
b.full = true
}
subs := make([]chan LogEntry, 0, len(b.listeners))
for ch := range b.listeners {
subs = append(subs, ch)
}
b.mu.Unlock()
for _, ch := range subs {
select {
case ch <- e:
default:
// slow subscriber; drop rather than back-pressure the writer
}
}
}
// Backlog returns the buffered entries oldest-first.
func (b *LogBus) Backlog() []LogEntry {
b.mu.RLock()
defer b.mu.RUnlock()
if !b.full {
out := make([]LogEntry, b.next)
copy(out, b.ring[:b.next])
return out
}
out := make([]LogEntry, 0, b.cap)
out = append(out, b.ring[b.next:]...)
out = append(out, b.ring[:b.next]...)
return out
}
// Subscribe registers a fresh channel that will receive every subsequent
// entry. Caller must call Unsubscribe when done.
func (b *LogBus) Subscribe() chan LogEntry {
ch := make(chan LogEntry, 32)
b.mu.Lock()
b.listeners[ch] = struct{}{}
b.mu.Unlock()
return ch
}
func (b *LogBus) Unsubscribe(ch chan LogEntry) {
b.mu.Lock()
delete(b.listeners, ch)
b.mu.Unlock()
close(ch)
}
// busHandler wraps a base slog.Handler and pushes a structured copy of each
// record to the LogBus. Errors during push are ignored — logging must never
// stall on UI subscribers.
type busHandler struct {
base slog.Handler
bus *LogBus
}
// NewBusHandler returns a slog.Handler that emits to both `base` and `bus`.
func NewBusHandler(base slog.Handler, bus *LogBus) slog.Handler {
return &busHandler{base: base, bus: bus}
}
func (h *busHandler) Enabled(ctx context.Context, lvl slog.Level) bool {
return h.base.Enabled(ctx, lvl)
}
func (h *busHandler) Handle(ctx context.Context, r slog.Record) error {
// First emit to the base handler so console/journald behaviour is
// preserved. Push to bus regardless of base error.
err := h.base.Handle(ctx, r)
var attrs string
r.Attrs(func(a slog.Attr) bool {
if attrs != "" {
attrs += " "
}
attrs += fmt.Sprintf("%s=%v", a.Key, a.Value.Any())
return true
})
h.bus.push(LogEntry{
Time: r.Time,
Level: r.Level.String(),
Msg: r.Message,
Attrs: attrs,
})
return err
}
func (h *busHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return &busHandler{base: h.base.WithAttrs(attrs), bus: h.bus}
}
func (h *busHandler) WithGroup(name string) slog.Handler {
return &busHandler{base: h.base.WithGroup(name), bus: h.bus}
}
// sseLogs streams the backlog + every new entry as Server-Sent Events.
// Each event is one JSON-encoded LogEntry on a `data:` line.
func sseLogs(bus *LogBus) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // disable proxy buffering
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
for _, e := range bus.Backlog() {
writeEvent(w, e)
}
flusher.Flush()
ch := bus.Subscribe()
defer bus.Unsubscribe(ch)
// Heartbeat keeps proxies from closing the conn during silent periods.
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-r.Context().Done():
return
case e := <-ch:
writeEvent(w, e)
flusher.Flush()
case <-heartbeat.C:
_, _ = io.WriteString(w, ":hb\n\n")
flusher.Flush()
}
}
}
}
func writeEvent(w io.Writer, e LogEntry) {
fmt.Fprintf(w, "data: {\"time\":%q,\"level\":%q,\"msg\":%q,\"attrs\":%q}\n\n",
e.Time.Format(time.RFC3339Nano), e.Level, e.Msg, e.Attrs)
}