f823c05aa3
Standalone Go service that routes SVC client traffic to per-server
backend voice endpoints, configured via pg LISTEN/NOTIFY (same channel
mc-router subscribes to). Each pg `servers` row with both
`voice_address` and `voice_proxy_port` set spawns a Valve: a public
UDP listener that maintains per-client ephemeral bridges to the
backend's SVC port.
Pieces:
cmd/svc-proxy/main.go entry; wires config, log fan-out,
bridge.Manager, pgsync, httpsrv
internal/config/ DATABASE_URL + BIND_HOST +
BRIDGE_IDLE_TTL (default 1m) +
HTTP_ADDR (default :8081)
internal/pgsync/ LISTEN automc_routes_changed; diff
desired/actual routes; emit Apply()
internal/bridge/ Valve per public port; per-client
bridge with atomic up/down byte counters;
idle eviction every 15s against TTL
internal/httpsrv/ operator UI — embedded single-page HTML
with active-connections table polled
every 1s + SSE log stream
(last 500 lines backlog on connect)
Reverse-proxied behind server-manager at /infra/svc-proxy/* — bind
internal-only addresses for production; auth is the dashboard's
Basic gate.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
191 lines
4.5 KiB
Go
191 lines
4.5 KiB
Go
package httpsrv
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// LogBus is a fan-out buffer for log lines. It holds a ring of the last N
|
|
// entries and broadcasts new lines to live SSE subscribers. The slog Handler
|
|
// in NewLogBus writes each formatted record into both the underlying handler
|
|
// (stderr) AND this bus.
|
|
type LogBus struct {
|
|
cap int
|
|
|
|
mu sync.RWMutex
|
|
ring []LogEntry
|
|
next int
|
|
full bool
|
|
listeners map[chan LogEntry]struct{}
|
|
}
|
|
|
|
type LogEntry struct {
|
|
Time time.Time `json:"time"`
|
|
Level string `json:"level"`
|
|
Msg string `json:"msg"`
|
|
Attrs string `json:"attrs,omitempty"`
|
|
}
|
|
|
|
func NewLogBus(capacity int) *LogBus {
|
|
if capacity <= 0 {
|
|
capacity = 500
|
|
}
|
|
return &LogBus{
|
|
cap: capacity,
|
|
ring: make([]LogEntry, capacity),
|
|
listeners: map[chan LogEntry]struct{}{},
|
|
}
|
|
}
|
|
|
|
func (b *LogBus) push(e LogEntry) {
|
|
b.mu.Lock()
|
|
b.ring[b.next] = e
|
|
b.next = (b.next + 1) % b.cap
|
|
if b.next == 0 {
|
|
b.full = true
|
|
}
|
|
subs := make([]chan LogEntry, 0, len(b.listeners))
|
|
for ch := range b.listeners {
|
|
subs = append(subs, ch)
|
|
}
|
|
b.mu.Unlock()
|
|
for _, ch := range subs {
|
|
select {
|
|
case ch <- e:
|
|
default:
|
|
// slow subscriber; drop rather than back-pressure the writer
|
|
}
|
|
}
|
|
}
|
|
|
|
// Backlog returns the buffered entries oldest-first.
|
|
func (b *LogBus) Backlog() []LogEntry {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
if !b.full {
|
|
out := make([]LogEntry, b.next)
|
|
copy(out, b.ring[:b.next])
|
|
return out
|
|
}
|
|
out := make([]LogEntry, 0, b.cap)
|
|
out = append(out, b.ring[b.next:]...)
|
|
out = append(out, b.ring[:b.next]...)
|
|
return out
|
|
}
|
|
|
|
// Subscribe registers a fresh channel that will receive every subsequent
|
|
// entry. Caller must call Unsubscribe when done.
|
|
func (b *LogBus) Subscribe() chan LogEntry {
|
|
ch := make(chan LogEntry, 32)
|
|
b.mu.Lock()
|
|
b.listeners[ch] = struct{}{}
|
|
b.mu.Unlock()
|
|
return ch
|
|
}
|
|
|
|
func (b *LogBus) Unsubscribe(ch chan LogEntry) {
|
|
b.mu.Lock()
|
|
delete(b.listeners, ch)
|
|
b.mu.Unlock()
|
|
close(ch)
|
|
}
|
|
|
|
// busHandler wraps a base slog.Handler and pushes a structured copy of each
|
|
// record to the LogBus. Errors during push are ignored — logging must never
|
|
// stall on UI subscribers.
|
|
type busHandler struct {
|
|
base slog.Handler
|
|
bus *LogBus
|
|
}
|
|
|
|
// NewBusHandler returns a slog.Handler that emits to both `base` and `bus`.
|
|
func NewBusHandler(base slog.Handler, bus *LogBus) slog.Handler {
|
|
return &busHandler{base: base, bus: bus}
|
|
}
|
|
|
|
func (h *busHandler) Enabled(ctx context.Context, lvl slog.Level) bool {
|
|
return h.base.Enabled(ctx, lvl)
|
|
}
|
|
|
|
func (h *busHandler) Handle(ctx context.Context, r slog.Record) error {
|
|
// First emit to the base handler so console/journald behaviour is
|
|
// preserved. Push to bus regardless of base error.
|
|
err := h.base.Handle(ctx, r)
|
|
|
|
var attrs string
|
|
r.Attrs(func(a slog.Attr) bool {
|
|
if attrs != "" {
|
|
attrs += " "
|
|
}
|
|
attrs += fmt.Sprintf("%s=%v", a.Key, a.Value.Any())
|
|
return true
|
|
})
|
|
h.bus.push(LogEntry{
|
|
Time: r.Time,
|
|
Level: r.Level.String(),
|
|
Msg: r.Message,
|
|
Attrs: attrs,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (h *busHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
|
return &busHandler{base: h.base.WithAttrs(attrs), bus: h.bus}
|
|
}
|
|
|
|
func (h *busHandler) WithGroup(name string) slog.Handler {
|
|
return &busHandler{base: h.base.WithGroup(name), bus: h.bus}
|
|
}
|
|
|
|
// sseLogs streams the backlog + every new entry as Server-Sent Events.
|
|
// Each event is one JSON-encoded LogEntry on a `data:` line.
|
|
func sseLogs(bus *LogBus) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no") // disable proxy buffering
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
for _, e := range bus.Backlog() {
|
|
writeEvent(w, e)
|
|
}
|
|
flusher.Flush()
|
|
|
|
ch := bus.Subscribe()
|
|
defer bus.Unsubscribe(ch)
|
|
|
|
// Heartbeat keeps proxies from closing the conn during silent periods.
|
|
heartbeat := time.NewTicker(30 * time.Second)
|
|
defer heartbeat.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
return
|
|
case e := <-ch:
|
|
writeEvent(w, e)
|
|
flusher.Flush()
|
|
case <-heartbeat.C:
|
|
_, _ = io.WriteString(w, ":hb\n\n")
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeEvent(w io.Writer, e LogEntry) {
|
|
fmt.Fprintf(w, "data: {\"time\":%q,\"level\":%q,\"msg\":%q,\"attrs\":%q}\n\n",
|
|
e.Time.Format(time.RFC3339Nano), e.Level, e.Msg, e.Attrs)
|
|
}
|