// Package bridge owns the UDP data plane: one Listener per backend route, // each Listener owns the public listener socket plus a pool of per-client // tunnels that copy datagrams to the backend's voice address and back. package bridge import ( "context" "errors" "fmt" "log/slog" "net" "sync" "time" "git.timemachine.center/timemachine/svc-proxy/internal/pgsync" ) // Manager is the top-level coordinator. Implements pgsync.Applier so the // pgsync goroutine can hand it desired/undesired routes; Manager turns those // into open/close calls on a Listener registry keyed by public port. type Manager struct { ctx context.Context bindHost string tunnelIdleTTL time.Duration mu sync.Mutex listeners map[int]*Listener // key: public UDP port } func NewManager(ctx context.Context, bindHost string, idleTTL time.Duration) *Manager { return &Manager{ ctx: ctx, bindHost: bindHost, tunnelIdleTTL: idleTTL, listeners: map[int]*Listener{}, } } // Apply satisfies pgsync.Applier. Open first (so a backend-address change // can flip-cleanly while the new listener takes over the new port), then // close. func (m *Manager) Apply(add []pgsync.Route, del []pgsync.Route) { m.mu.Lock() defer m.mu.Unlock() for _, r := range add { if existing, ok := m.listeners[r.Port]; ok { // Same port, different backend — close, then re-open. existing.Close() delete(m.listeners, r.Port) } l, err := openListener(m.ctx, m.bindHost, r, m.tunnelIdleTTL) if err != nil { slog.Error("listener open failed", "port", r.Port, "addr", r.Address, "name", r.Name, "err", err) continue } m.listeners[r.Port] = l slog.Info("listener open", "port", r.Port, "addr", r.Address, "name", r.Name) } for _, r := range del { l, ok := m.listeners[r.Port] if !ok { continue } l.Close() delete(m.listeners, r.Port) slog.Info("listener close", "port", r.Port, "name", r.Name) } } // Shutdown closes every active listener. Safe to call once; idempotent for // per-listener Close. func (m *Manager) Shutdown() { m.mu.Lock() defer m.mu.Unlock() for port, l := range m.listeners { l.Close() delete(m.listeners, port) } } // Listener owns one public UDP socket and the per-client tunnels hanging // off it. Each tunnel is a goroutine that copies datagrams from one // ephemeral upstream socket back to the original client. The public socket // itself is the egress for backend → client. type Listener struct { route pgsync.Route backend *net.UDPAddr pub *net.UDPConn // 0.0.0.0: idleTTL time.Duration ctx context.Context cancel context.CancelFunc mu sync.Mutex tunnels map[string]*tunnel // key: client.RemoteAddr().String() } func openListener(parent context.Context, bindHost string, r pgsync.Route, idleTTL time.Duration) (*Listener, error) { backend, err := net.ResolveUDPAddr("udp", r.Address) if err != nil { return nil, fmt.Errorf("resolve backend %q: %w", r.Address, err) } pubAddr := &net.UDPAddr{IP: net.ParseIP(bindHost), Port: r.Port} if pubAddr.IP == nil { return nil, fmt.Errorf("bind host %q not an IP", bindHost) } pub, err := net.ListenUDP("udp", pubAddr) if err != nil { return nil, fmt.Errorf("bind %s: %w", pubAddr, err) } ctx, cancel := context.WithCancel(parent) l := &Listener{ route: r, backend: backend, pub: pub, idleTTL: idleTTL, ctx: ctx, cancel: cancel, tunnels: map[string]*tunnel{}, } go l.readLoop() go l.evictIdle() return l, nil } // readLoop runs forever copying packets from the public socket to per-client // upstream sockets. The reverse direction (backend → client) is per-tunnel // goroutines on the upstream sockets writing back to l.pub. func (l *Listener) readLoop() { buf := make([]byte, 2048) // SVC max datagram body for { n, src, err := l.pub.ReadFromUDP(buf) if err != nil { if l.ctx.Err() != nil || errors.Is(err, net.ErrClosed) { return } slog.Warn("listener read error", "port", l.route.Port, "err", err) continue } l.mu.Lock() t, ok := l.tunnels[src.String()] if !ok { t, err = l.openTunnel(src) if err != nil { l.mu.Unlock() slog.Warn("tunnel open failed", "port", l.route.Port, "src", src, "err", err) continue } l.tunnels[src.String()] = t slog.Debug("tunnel open", "port", l.route.Port, "client", src.String()) } l.mu.Unlock() t.touch() if _, err := t.upstream.Write(buf[:n]); err != nil { if l.ctx.Err() == nil { slog.Warn("tunnel forward failed", "port", l.route.Port, "err", err) } continue } t.counters.bytesUp.Add(uint64(n)) } } func (l *Listener) openTunnel(src *net.UDPAddr) (*tunnel, error) { up, err := net.DialUDP("udp", nil, l.backend) if err != nil { return nil, fmt.Errorf("dial backend: %w", err) } now := time.Now() t := &tunnel{ client: src, upstream: up, listener: l, openedAt: now, } t.lastSeen = now go t.readBackend() return t, nil } func (l *Listener) evictIdle() { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() for { select { case <-l.ctx.Done(): return case <-ticker.C: cutoff := time.Now().Add(-l.idleTTL) l.mu.Lock() for k, t := range l.tunnels { if t.lastUseBefore(cutoff) { slog.Debug("tunnel idle evict", "port", l.route.Port, "client", k) t.close() delete(l.tunnels, k) } } l.mu.Unlock() } } } func (l *Listener) Close() { l.cancel() l.pub.Close() l.mu.Lock() for k, t := range l.tunnels { t.close() delete(l.tunnels, k) } l.mu.Unlock() } // tunnel is one client's UDP relay: a dedicated upstream socket to the // backend + a touch-tracked last-seen timestamp for idle eviction. type tunnel struct { client *net.UDPAddr upstream *net.UDPConn listener *Listener counters counters // atomic — hot path mu sync.Mutex lastSeen time.Time openedAt time.Time } func (t *tunnel) touch() { t.mu.Lock() t.lastSeen = time.Now() t.mu.Unlock() } func (t *tunnel) lastUseBefore(cutoff time.Time) bool { t.mu.Lock() defer t.mu.Unlock() return t.lastSeen.Before(cutoff) } func (t *tunnel) close() { _ = t.upstream.Close() } // readBackend pumps datagrams from the backend back to the client via the // public socket. Exits when the upstream socket is closed. func (t *tunnel) readBackend() { buf := make([]byte, 2048) for { n, err := t.upstream.Read(buf) if err != nil { return } t.touch() if _, err := t.listener.pub.WriteToUDP(buf[:n], t.client); err != nil { if t.listener.ctx.Err() == nil { slog.Warn("tunnel reverse failed", "port", t.listener.route.Port, "err", err) } return } t.counters.bytesDown.Add(uint64(n)) } }