Files
claude-timemachine f823c05aa3
CI / validate (push) Successful in 24s
CI / docker (push) Failing after 1m49s
initial: svc-proxy — UDP valve for Simple Voice Chat
Standalone Go service that routes SVC client traffic to per-server
backend voice endpoints, configured via pg LISTEN/NOTIFY (same channel
mc-router subscribes to). Each pg `servers` row with both
`voice_address` and `voice_proxy_port` set spawns a Valve: a public
UDP listener that maintains per-client ephemeral bridges to the
backend's SVC port.

Pieces:
  cmd/svc-proxy/main.go     entry; wires config, log fan-out,
                            bridge.Manager, pgsync, httpsrv
  internal/config/          DATABASE_URL + BIND_HOST +
                            BRIDGE_IDLE_TTL (default 1m) +
                            HTTP_ADDR (default :8081)
  internal/pgsync/          LISTEN automc_routes_changed; diff
                            desired/actual routes; emit Apply()
  internal/bridge/          Valve per public port; per-client
                            bridge with atomic up/down byte counters;
                            idle eviction every 15s against TTL
  internal/httpsrv/         operator UI — embedded single-page HTML
                            with active-connections table polled
                            every 1s + SSE log stream
                            (last 500 lines backlog on connect)

Reverse-proxied behind server-manager at /infra/svc-proxy/* — bind
internal-only addresses for production; auth is the dashboard's
Basic gate.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-10 18:01:04 +02:00

143 lines
3.5 KiB
Go

// Package pgsync mirrors the LISTEN/NOTIFY route-source pattern from
// Timemachine/mc-router's internal/automc. Watches the `servers` table for
// rows that have both voice columns set and emits Route events whenever the
// desired set changes.
package pgsync
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
)
const (
NotifyChannel = "automc_routes_changed"
reconnectMin = 1 * time.Second
reconnectMax = 30 * time.Second
)
// Route is a single voice routing row from postgres. Both Port and Address
// are guaranteed non-zero when emitted via Apply.
type Route struct {
Name string // human-readable; logged on open/close
Port int // public UDP port svc-proxy binds
Address string // backend voice host:port (e.g. "mc-gtnh:24454")
}
// Applier reconciles a desired route set against currently-bound valves.
// Add is called for routes that are new or whose backend address changed;
// Del is called for routes that disappeared or whose backend address changed
// (in pair with the new Add for the same port).
type Applier interface {
Apply(add []Route, del []Route)
}
type Syncer struct {
dsn string
applier Applier
current map[string]Route // key: server name
}
func New(dsn string, a Applier) *Syncer {
return &Syncer{dsn: dsn, applier: a, current: map[string]Route{}}
}
// Run blocks until ctx is cancelled. Reconnects on error with exponential
// backoff capped at reconnectMax.
func (s *Syncer) Run(ctx context.Context) {
backoff := reconnectMin
for {
if ctx.Err() != nil {
return
}
err := s.connectAndLoop(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
slog.Warn("pgsync disconnected", "err", err, "retry_in", backoff)
}
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
backoff *= 2
if backoff > reconnectMax {
backoff = reconnectMax
}
}
}
func (s *Syncer) connectAndLoop(ctx context.Context) error {
conn, err := pgx.Connect(ctx, s.dsn)
if err != nil {
return fmt.Errorf("pgx connect: %w", err)
}
defer conn.Close(context.Background())
if _, err := conn.Exec(ctx, "LISTEN "+NotifyChannel); err != nil {
return fmt.Errorf("LISTEN: %w", err)
}
slog.Info("pgsync connected", "channel", NotifyChannel)
if err := s.refresh(ctx, conn); err != nil {
return fmt.Errorf("initial refresh: %w", err)
}
for {
if _, err := conn.WaitForNotification(ctx); err != nil {
return fmt.Errorf("wait notification: %w", err)
}
if err := s.refresh(ctx, conn); err != nil {
return fmt.Errorf("refresh: %w", err)
}
}
}
func (s *Syncer) refresh(ctx context.Context, conn *pgx.Conn) error {
rows, err := conn.Query(ctx, `
SELECT name, voice_proxy_port, voice_address
FROM servers
WHERE enabled IS NOT FALSE
AND voice_proxy_port IS NOT NULL
AND voice_address IS NOT NULL
AND voice_address != ''`)
if err != nil {
return err
}
defer rows.Close()
desired := map[string]Route{}
for rows.Next() {
var r Route
if err := rows.Scan(&r.Name, &r.Port, &r.Address); err != nil {
return err
}
desired[r.Name] = r
}
if err := rows.Err(); err != nil {
return err
}
add, del := diff(s.current, desired)
s.applier.Apply(add, del)
s.current = desired
return nil
}
func diff(prev, next map[string]Route) (add []Route, del []Route) {
for name, r := range next {
if p, ok := prev[name]; !ok || p.Port != r.Port || p.Address != r.Address {
add = append(add, r)
}
}
for name, r := range prev {
if n, ok := next[name]; !ok || n.Port != r.Port || n.Address != r.Address {
del = append(del, r)
}
}
return add, del
}