// Package pgsync mirrors the LISTEN/NOTIFY route-source pattern from // Timemachine/mc-router's internal/automc. Watches the `servers` table for // rows that have both voice columns set and emits Route events whenever the // desired set changes. package pgsync import ( "context" "errors" "fmt" "log/slog" "time" "github.com/jackc/pgx/v5" ) const ( NotifyChannel = "automc_routes_changed" reconnectMin = 1 * time.Second reconnectMax = 30 * time.Second ) // Route is a single voice routing row from postgres. Both Port and Address // are guaranteed non-zero when emitted via Apply. type Route struct { Name string // human-readable; logged on open/close Port int // public UDP port svc-proxy binds Address string // backend voice host:port (e.g. "mc-gtnh:24454") } // Applier reconciles a desired route set against currently-bound valves. // Add is called for routes that are new or whose backend address changed; // Del is called for routes that disappeared or whose backend address changed // (in pair with the new Add for the same port). type Applier interface { Apply(add []Route, del []Route) } type Syncer struct { dsn string applier Applier current map[string]Route // key: server name } func New(dsn string, a Applier) *Syncer { return &Syncer{dsn: dsn, applier: a, current: map[string]Route{}} } // Run blocks until ctx is cancelled. Reconnects on error with exponential // backoff capped at reconnectMax. func (s *Syncer) Run(ctx context.Context) { backoff := reconnectMin for { if ctx.Err() != nil { return } err := s.connectAndLoop(ctx) if err != nil && !errors.Is(err, context.Canceled) { slog.Warn("pgsync disconnected", "err", err, "retry_in", backoff) } select { case <-ctx.Done(): return case <-time.After(backoff): } backoff *= 2 if backoff > reconnectMax { backoff = reconnectMax } } } func (s *Syncer) connectAndLoop(ctx context.Context) error { conn, err := pgx.Connect(ctx, s.dsn) if err != nil { return fmt.Errorf("pgx connect: %w", err) } defer conn.Close(context.Background()) if _, err := conn.Exec(ctx, "LISTEN "+NotifyChannel); err != nil { return fmt.Errorf("LISTEN: %w", err) } slog.Info("pgsync connected", "channel", NotifyChannel) if err := s.refresh(ctx, conn); err != nil { return fmt.Errorf("initial refresh: %w", err) } for { if _, err := conn.WaitForNotification(ctx); err != nil { return fmt.Errorf("wait notification: %w", err) } if err := s.refresh(ctx, conn); err != nil { return fmt.Errorf("refresh: %w", err) } } } func (s *Syncer) refresh(ctx context.Context, conn *pgx.Conn) error { rows, err := conn.Query(ctx, ` SELECT name, voice_proxy_port, voice_address FROM servers WHERE enabled IS NOT FALSE AND voice_proxy_port IS NOT NULL AND voice_address IS NOT NULL AND voice_address != ''`) if err != nil { return err } defer rows.Close() desired := map[string]Route{} for rows.Next() { var r Route if err := rows.Scan(&r.Name, &r.Port, &r.Address); err != nil { return err } desired[r.Name] = r } if err := rows.Err(); err != nil { return err } add, del := diff(s.current, desired) s.applier.Apply(add, del) s.current = desired return nil } func diff(prev, next map[string]Route) (add []Route, del []Route) { for name, r := range next { if p, ok := prev[name]; !ok || p.Port != r.Port || p.Address != r.Address { add = append(add, r) } } for name, r := range prev { if n, ok := next[name]; !ok || n.Port != r.Port || n.Address != r.Address { del = append(del, r) } } return add, del }