automc: pg LISTEN/NOTIFY route source + HTTP waker
Adds opt-in extension package internal/automc/ that: - Subscribes to Postgres notifications on a 'servers' table and pushes route changes into server.Routes (no file I/O, no fsnotify). - Provides a WakerFunc that POSTs to a configurable HTTP control plane (server-manager) and polls until state=running. When AUTOMC_DSN is unset, Wire() is a no-op and the binary behaves exactly like upstream itzg/mc-router. Single patch site in main.go (import + 4-line call) keeps upstream rebases trivial. See docs/AUTOMC.md for env vars and the expected DB schema/trigger.
This commit is contained in:
@@ -0,0 +1,133 @@
|
||||
package automc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/itzg/mc-router/server"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
notifyChannel = "automc_routes_changed"
|
||||
reconnectMin = 1 * time.Second
|
||||
reconnectMax = 30 * time.Second
|
||||
)
|
||||
|
||||
type route struct {
|
||||
name string
|
||||
domain string
|
||||
address string
|
||||
}
|
||||
|
||||
type syncer struct {
|
||||
dsn string
|
||||
waker *wakerConfig
|
||||
current map[string]route
|
||||
}
|
||||
|
||||
func newSyncer(dsn string, w *wakerConfig) *syncer {
|
||||
return &syncer{dsn: dsn, waker: w, current: map[string]route{}}
|
||||
}
|
||||
|
||||
func (s *syncer) run(ctx context.Context) {
|
||||
backoff := reconnectMin
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
err := s.connectAndLoop(ctx)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
logrus.WithError(err).Warnf("automc pgsync disconnected; reconnecting in %s", backoff)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
backoff *= 2
|
||||
if backoff > reconnectMax {
|
||||
backoff = reconnectMax
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncer) connectAndLoop(ctx context.Context) error {
|
||||
conn, err := pgx.Connect(ctx, s.dsn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pgx connect: %w", err)
|
||||
}
|
||||
defer conn.Close(context.Background())
|
||||
|
||||
if _, err := conn.Exec(ctx, "LISTEN "+notifyChannel); err != nil {
|
||||
return fmt.Errorf("LISTEN: %w", err)
|
||||
}
|
||||
logrus.Infof("automc pgsync connected; LISTEN %s", notifyChannel)
|
||||
|
||||
if err := s.refresh(ctx, conn); err != nil {
|
||||
return fmt.Errorf("initial refresh: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
if _, err := conn.WaitForNotification(ctx); err != nil {
|
||||
return fmt.Errorf("wait notification: %w", err)
|
||||
}
|
||||
if err := s.refresh(ctx, conn); err != nil {
|
||||
return fmt.Errorf("refresh: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncer) refresh(ctx context.Context, conn *pgx.Conn) error {
|
||||
rows, err := conn.Query(ctx, `SELECT name, domain, address FROM servers WHERE domain != '' AND address != ''`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
desired := map[string]route{}
|
||||
for rows.Next() {
|
||||
var r route
|
||||
if err := rows.Scan(&r.name, &r.domain, &r.address); err != nil {
|
||||
return err
|
||||
}
|
||||
desired[r.domain] = r
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
add, del := diff(s.current, desired)
|
||||
s.apply(add, del)
|
||||
s.current = desired
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *syncer) apply(add []route, del []string) {
|
||||
for _, host := range del {
|
||||
if server.Routes.DeleteMapping(host) {
|
||||
logrus.Infof("automc route -: %s", host)
|
||||
}
|
||||
}
|
||||
for _, r := range add {
|
||||
server.Routes.CreateMapping(r.domain, r.address, "", s.waker.wakerFor(r.name), nil, "", "")
|
||||
logrus.Infof("automc route +: %s → %s (%s)", r.domain, r.address, r.name)
|
||||
}
|
||||
}
|
||||
|
||||
func diff(prev, next map[string]route) (add []route, del []string) {
|
||||
for host, r := range next {
|
||||
if p, ok := prev[host]; !ok || p.address != r.address || p.name != r.name {
|
||||
add = append(add, r)
|
||||
}
|
||||
}
|
||||
for host := range prev {
|
||||
if _, ok := next[host]; !ok {
|
||||
del = append(del, host)
|
||||
}
|
||||
}
|
||||
return add, del
|
||||
}
|
||||
Reference in New Issue
Block a user