From 74d0c4002288d96591cc3bed2f15259f10af5a77 Mon Sep 17 00:00:00 2001 From: Caedis Date: Sat, 9 May 2026 12:33:04 -0500 Subject: [PATCH] Convert docker polling to event listening (#548) --- CLAUDE.md | 24 ++- README.md | 4 +- server/configs.go | 2 +- server/docker.go | 361 ++++++++++++++++++++++++++++++----------- server/docker_swarm.go | 208 +++++++++++++++--------- server/server.go | 9 +- 6 files changed, 423 insertions(+), 185 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index e957db9..716a3e0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -17,7 +17,7 @@ go test -run TestRouteLookup ./server/ # Run a single test docker build -t mc-router . # Build Docker image ``` -Go version: 1.25. Testing uses `testify` (assert/require). Tests are table-driven. +Go version: 1.26.2. Testing uses `testify` (assert/require). Tests are table-driven with subtests. Mock pattern: embed `mock.Mock` and call `m.MethodCalled()` (see `k8s_test.go`). Protocol packet tests use hex fixture files in `testdata/` (e.g., `handshake-status.hex`). Test setup for route tests calls `NewRoutes()` and restores the global `Routes` singleton with defer. ## Architecture @@ -39,7 +39,8 @@ Go version: 1.25. Testing uses `testify` (assert/require). Tests are table-drive - `routes.go` — In-memory route table mapping server addresses to backends; supports default route fallback - `routes_config_loader.go` — Loads/watches JSON routes config file (with fsnotify) - `k8s.go` — Kubernetes service discovery via annotation `mc-router.itzg.me/externalServerName` - - `docker.go` / `docker_swarm.go` — Docker/Swarm container discovery via label `mc-router.host` + - `docker.go` — Docker container discovery via label `mc-router.host`; event-driven via Docker Events API (`client.Events`). Each event handled incrementally by `applyEvent` → `containersForID` (single `ContainerInspect`) → `applyContainerRoutesLocked` (touches only that container's routes). Full `monitorContainers` re-list runs at startup and on event-stream reconnect (exponential backoff) + - `docker_swarm.go` — Docker Swarm service discovery via label `mc-router.host`; event-driven, but each service event triggers a full `reconcileServices` re-list (services churn rarely, swarm has no autoscaling) - `down_scaler.go` — Auto-scale down after idle period - `api_server.go` — REST API (`GET/POST /routes`, `POST /defaultRoute`, `DELETE /routes/{serverAddress}`) - `metrics.go` — Pluggable metrics backends (Prometheus, InfluxDB, expvar, discard) @@ -60,7 +61,7 @@ CLI flags are the primary config mechanism, with environment variable support vi Routes are populated from three sources that can be combined: 1. Static `--mapping` flags or JSON config file 2. Kubernetes: watches Services with `mc-router.itzg.me/externalServerName` annotation -3. Docker/Swarm: watches containers with `mc-router.host` label +3. Docker/Swarm: watches containers/services via the Docker Events API, filtered to lifecycle events (label `mc-router.host`) ### Key Dependencies @@ -72,6 +73,21 @@ Routes are populated from three sources that can be combined: - `golang.ngrok.com/ngrok` — ngrok tunnel integration - `github.com/stretchr/testify` — Test assertions +### Concurrency Model + +- **`routes.go`**: global singleton `var Routes = NewRoutes()`. `sync.RWMutex` protects `mappings` and `defaultRoute` — `RLock` for all reads, `Lock` for mutations. +- **`connector.go`**: `ActiveConnections` map guarded by `sync.RWMutex`; `totalActiveConnections` counter uses `atomic.AddInt32`. Shutdown drain uses `sync.Cond` in `WaitForConnections()`. +- **Bidirectional proxy**: two goroutines per connection (client→backend, backend→client) communicate via a buffered `chan error` (size 2) — first error triggers mutual close. +- All goroutines respect context cancellation via `select { case <-ctx.Done() }`. + +### Error Handling + +- Wrap with context: `fmt.Errorf("message: %w", err)` +- Check specific sentinels: `errors.Is(err, io.EOF)` +- Log with fields: `logrus.WithError(err).WithField("key", val).Error("msg")` + ### Protocol Notes -The `mcproto` package handles Minecraft Java protocol quirks: Forge mod identifiers appended to server addresses (separated by `\x00`), DNS root zone trailing dots, legacy server list ping format, and VarInt encoding. Server address matching in routes strips these suffixes before lookup. +The `mcproto` package handles Minecraft Java protocol quirks: Forge mod identifiers appended to server addresses (separated by `\x00`), DNS root zone trailing dots, legacy server list ping format, and VarInt encoding. Server address matching in routes also strips TCP Shield patterns (`///...`) and lowercases before lookup. + +Auto-scale MOTD fallback: waking servers display `LoadingMOTD`; sleeping servers display `AsleepMOTD`. Per-route MOTDs take precedence over global ones. diff --git a/README.md b/README.md index 34bc08c..6bed349 100644 --- a/README.md +++ b/README.md @@ -50,8 +50,6 @@ Some other features included: host:port of a default Minecraft server to use when mapping not found (env DEFAULT) -docker-api-version string Instead of auto-negotiating, use specific Docker API version (env DOCKER_API_VERSION) - -docker-refresh-interval int - Refresh interval in seconds for the Docker integrations (env DOCKER_REFRESH_INTERVAL) (default 15) -docker-socket string Path to Docker socket to use (env DOCKER_SOCKET) (default "unix:///var/run/docker.sock") -docker-timeout int @@ -171,7 +169,7 @@ To test out this example, add these two entries to my "hosts" file: ### Using Docker auto-discovery -When running `mc-router` in a Docker environment you can pass the `--in-docker` or `--in-docker-swarm` command-line argument or set the environment variables `IN_DOCKER` or `IN_DOCKER_SWARM` to "true". With that, it will poll the Docker API periodically to find all the running containers/services for Minecraft instances. To enable discovery, you have to set the `mc-router.host` label on the container. +When running `mc-router` in a Docker environment you can pass the `--in-docker` or `--in-docker-swarm` command-line argument or set the environment variables `IN_DOCKER` or `IN_DOCKER_SWARM` to "true". With that, it will subscribe to the Docker event stream to react to container/service lifecycle changes (start, stop, pause, unpause, rename, network connect/disconnect for containers; create, update, remove for swarm services) and update routes immediately. An initial listing is performed on startup, and the stream is reconnected with exponential backoff on errors (e.g. daemon restart). To enable discovery, you have to set the `mc-router.host` label on the container. When using in Docker, make sure to volume mount the Docker socket into the container, such as diff --git a/server/configs.go b/server/configs.go index c22ee72..a2bf71e 100644 --- a/server/configs.go +++ b/server/configs.go @@ -40,7 +40,7 @@ type Config struct { InDockerSwarm bool `usage:"Use Docker Swarm service discovery"` DockerSocket string `usage:"Path to Docker socket to use"` DockerTimeout time.Duration `usage:"Timeout (as duration) for the Docker integrations"` - DockerRefreshInterval time.Duration `default:"15s" usage:"Refresh interval (as duration) for the Docker integrations"` + DockerRefreshInterval time.Duration `usage:"Deprecated and ignored: Docker discovery is now event-driven"` DockerApiVersion string `usage:"Instead of auto-negotiating, use specific Docker API version"` MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb,prometheus"` MetricsBackendConfig MetricsBackendConfig diff --git a/server/docker.go b/server/docker.go index 16cba62..7cae656 100644 --- a/server/docker.go +++ b/server/docker.go @@ -9,7 +9,10 @@ import ( "sync" "time" + cerrdefs "github.com/containerd/errdefs" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/sirupsen/logrus" ) @@ -30,12 +33,11 @@ const ( ) type dockerWatcherConfig struct { - autoScaleUp bool - autoScaleDown bool - socket string - timeout time.Duration - refreshInterval time.Duration - apiVersion string + autoScaleUp bool + autoScaleDown bool + socket string + timeout time.Duration + apiVersion string } func (c *dockerWatcherConfig) apiVersionOpt() client.Opt { @@ -48,15 +50,14 @@ func (c *dockerWatcherConfig) apiVersionOpt() client.Opt { } } -func NewDockerWatcher(socket string, timeout time.Duration, refreshInterval time.Duration, autoScaleUp bool, autoScaleDown bool, dockerApiVersion string) IDockerWatcher { +func NewDockerWatcher(socket string, timeout time.Duration, autoScaleUp bool, autoScaleDown bool, dockerApiVersion string) IDockerWatcher { return &dockerWatcherImpl{ config: dockerWatcherConfig{ - socket: socket, - timeout: timeout, - refreshInterval: refreshInterval, - autoScaleUp: autoScaleUp, - autoScaleDown: autoScaleDown, - apiVersion: dockerApiVersion, + socket: socket, + timeout: timeout, + autoScaleUp: autoScaleUp, + autoScaleDown: autoScaleDown, + apiVersion: dockerApiVersion, }, } } @@ -111,12 +112,7 @@ func (w *dockerWatcherImpl) makeWakerFunc(rc *routableContainer) WakerFunc { } endpoint := net.JoinHostPort(data.ip, strconv.Itoa(int(data.port))) - // Update the route mappings - err = w.monitorContainers(ctx) - if err != nil { - logrus.WithError(err).Error("Docker monitoring failed") - return "", err - } + // Route table updates via Docker `start`/`network connect` events. // Wait until the container is reachable deadline := time.Now().Add(60 * time.Second) @@ -164,15 +160,15 @@ func (w *dockerWatcherImpl) makeSleeperFunc(rc *routableContainer) SleeperFunc { return err } } - err = w.monitorContainers(ctx) - if err != nil { - logrus.WithError(err).Error("Docker monitoring failed") - return err - } + // Route table updates via Docker `die`/`stop`/`network disconnect` events. return nil } } +// monitorContainers does a full re-list of Docker containers and reconciles +// the route table against it. Used for initial sync at startup and for +// resync after the event stream reconnects (to catch any events missed +// during disconnect). func (w *dockerWatcherImpl) monitorContainers(ctx context.Context) error { w.monitorLock.Lock() defer w.monitorLock.Unlock() @@ -184,51 +180,178 @@ func (w *dockerWatcherImpl) monitorContainers(ctx context.Context) error { return err } - visited := map[string]struct{}{} - for _, rs := range containers { - if oldRs, ok := w.containerMap[rs.externalContainerName]; !ok { - w.containerMap[rs.externalContainerName] = rs - logrus.WithField("routableContainer", rs).Debug("ADD") - wakerFunc := w.makeWakerFunc(rs) - sleeperFunc := w.makeSleeperFunc(rs) - if rs.externalContainerName != "" { - Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD) - } else { - Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD) - } - } else if oldRs.containerEndpoint != rs.containerEndpoint || - oldRs.containerID != rs.containerID || - oldRs.autoScaleUp != rs.autoScaleUp || - oldRs.autoScaleDown != rs.autoScaleDown || - oldRs.autoScaleAsleepMOTD != rs.autoScaleAsleepMOTD || - oldRs.autoScaleLoadingMOTD != rs.autoScaleLoadingMOTD { - w.containerMap[rs.externalContainerName] = rs - wakerFunc := w.makeWakerFunc(rs) - sleeperFunc := w.makeSleeperFunc(rs) - if rs.externalContainerName != "" { - Routes.DeleteMapping(rs.externalContainerName) - Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD) - } else { - Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD) - } - logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE") - } - visited[rs.externalContainerName] = struct{}{} + byID := map[string][]*routableContainer{} + for _, rc := range containers { + byID[rc.containerID] = append(byID[rc.containerID], rc) } - for _, rs := range w.containerMap { - if _, ok := visited[rs.externalContainerName]; !ok { - delete(w.containerMap, rs.externalContainerName) - if rs.externalContainerName != "" { - Routes.DeleteMapping(rs.externalContainerName) - } else { - Routes.SetDefaultRoute("", "", nil, nil, "", "") - } - logrus.WithField("routableContainer", rs).Debug("DELETE") + + for id, desired := range byID { + w.applyContainerRoutesLocked(id, desired) + } + + // Remove entries whose container is no longer present at all + for name, rc := range w.containerMap { + if _, present := byID[rc.containerID]; present { + continue } + delete(w.containerMap, name) + if name != "" { + Routes.DeleteMapping(name) + } else { + Routes.SetDefaultRoute("", "", nil, nil, "", "") + } + logrus.WithField("routableContainer", rc).Debug("DELETE") } return nil } +// applyEvent reacts to a single Docker event by reconciling only the routes +// belonging to the affected container — no full re-list. +func (w *dockerWatcherImpl) applyEvent(ctx context.Context, ev events.Message) error { + containerID := ev.Actor.ID + if ev.Type == events.NetworkEventType { + containerID = ev.Actor.Attributes["container"] + } + if containerID == "" { + logrus.WithField("event", ev).Warn("network event missing container attribute, skipping") + return nil + } + + var desired []*routableContainer + if !(ev.Type == events.ContainerEventType && ev.Action == events.ActionDestroy) { + got, err := w.containersForID(ctx, containerID) + if err != nil { + return err + } + desired = got + } + + w.monitorLock.Lock() + defer w.monitorLock.Unlock() + + // Only trace events that affect a routed container — either one we already + // track or one becoming routable now. Filters out unrelated daemon noise. + relevant := len(desired) > 0 + if !relevant { + for _, rc := range w.containerMap { + if rc.containerID == containerID { + relevant = true + break + } + } + } + if relevant { + logrus.WithFields(logrus.Fields{"type": ev.Type, "action": ev.Action, "id": containerID}).Trace("Docker event") + } + + w.applyContainerRoutesLocked(containerID, desired) + return nil +} + +// containersForID inspects a single container and returns the routableContainers +// it should produce. Returns nil if the container is gone or not routable. +func (w *dockerWatcherImpl) containersForID(ctx context.Context, containerID string) ([]*routableContainer, error) { + inspect, err := w.client.ContainerInspect(ctx, containerID) + if err != nil { + if cerrdefs.IsNotFound(err) { + return nil, nil + } + return nil, err + } + data, ok := w.parseContainerData(&inspect) + if !ok { + return nil, nil + } + endpoint := "" + if !data.notRunning { + endpoint = fmt.Sprintf("%s:%d", data.ip, data.port) + } + var result []*routableContainer + for _, host := range data.hosts { + result = append(result, &routableContainer{ + containerEndpoint: endpoint, + externalContainerName: host, + containerID: containerID, + autoScaleUp: data.autoScaleUp, + autoScaleDown: data.autoScaleDown, + autoScaleAsleepMOTD: data.autoScaleAsleepMOTD, + autoScaleLoadingMOTD: data.autoScaleLoadingMOTD, + }) + } + if data.def != nil && *data.def { + result = append(result, &routableContainer{ + containerEndpoint: endpoint, + externalContainerName: "", + containerID: containerID, + autoScaleUp: data.autoScaleUp, + autoScaleDown: data.autoScaleDown, + autoScaleAsleepMOTD: data.autoScaleAsleepMOTD, + autoScaleLoadingMOTD: data.autoScaleLoadingMOTD, + }) + } + return result, nil +} + +// applyContainerRoutesLocked reconciles the routes for a single containerID +// against the desired set. Caller must hold monitorLock. +func (w *dockerWatcherImpl) applyContainerRoutesLocked(containerID string, desired []*routableContainer) { + desiredByName := map[string]*routableContainer{} + for _, rc := range desired { + desiredByName[rc.externalContainerName] = rc + } + + // Drop entries previously owned by this container that are no longer desired + for name, rc := range w.containerMap { + if rc.containerID != containerID { + continue + } + if _, keep := desiredByName[name]; keep { + continue + } + delete(w.containerMap, name) + if name != "" { + Routes.DeleteMapping(name) + } else { + Routes.SetDefaultRoute("", "", nil, nil, "", "") + } + logrus.WithField("routableContainer", rc).Debug("DELETE") + } + + for _, rs := range desired { + oldRs, exists := w.containerMap[rs.externalContainerName] + if !exists { + w.containerMap[rs.externalContainerName] = rs + wakerFunc := w.makeWakerFunc(rs) + sleeperFunc := w.makeSleeperFunc(rs) + if rs.externalContainerName != "" { + Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD) + } else { + Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD) + } + logrus.WithField("routableContainer", rs).Debug("ADD") + continue + } + if oldRs.containerEndpoint == rs.containerEndpoint && + oldRs.containerID == rs.containerID && + oldRs.autoScaleUp == rs.autoScaleUp && + oldRs.autoScaleDown == rs.autoScaleDown && + oldRs.autoScaleAsleepMOTD == rs.autoScaleAsleepMOTD && + oldRs.autoScaleLoadingMOTD == rs.autoScaleLoadingMOTD { + continue + } + w.containerMap[rs.externalContainerName] = rs + wakerFunc := w.makeWakerFunc(rs) + sleeperFunc := w.makeSleeperFunc(rs) + if rs.externalContainerName != "" { + Routes.DeleteMapping(rs.externalContainerName) + Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD) + } else { + Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD) + } + logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE") + } +} + func (w *dockerWatcherImpl) Start(ctx context.Context) error { var err error @@ -248,49 +371,95 @@ func (w *dockerWatcherImpl) Start(ctx context.Context) error { if err != nil { return err } - - // TODO: replace all this with events listening - ticker := time.NewTicker(w.config.refreshInterval) + w.containerMap = map[string]*routableContainer{} logrus.Trace("Performing initial listing of Docker containers") - initialContainers, err := w.listContainers(ctx) - if err != nil { + if err := w.monitorContainers(ctx); err != nil { return err } - w.containerMap = map[string]*routableContainer{} - for _, c := range initialContainers { - w.containerMap[c.externalContainerName] = c - wakerFunc := w.makeWakerFunc(c) - sleeperFunc := w.makeSleeperFunc(c) - if c.externalContainerName != "" { - Routes.CreateMapping(c.externalContainerName, c.containerEndpoint, "", wakerFunc, sleeperFunc, c.autoScaleAsleepMOTD, c.autoScaleLoadingMOTD) - } else { - Routes.SetDefaultRoute(c.containerEndpoint, "", wakerFunc, sleeperFunc, c.autoScaleAsleepMOTD, c.autoScaleLoadingMOTD) - } - } - - go func() { - for { - select { - case <-ticker.C: - err := w.monitorContainers(ctx) - if err != nil { - logrus.WithError(err).Error("Docker monitoring failed") - return - } - case <-ctx.Done(): - logrus.Debug("Stopping Docker monitoring") - ticker.Stop() - return - } - } - }() + // streamEvents will resync on (re)connect and otherwise apply incremental + // updates from the Docker event stream — no periodic polling. + go w.streamEvents(ctx) logrus.Info("Monitoring Docker for Minecraft containers") return nil } +// streamEvents subscribes to the Docker event stream and triggers reconciliation +// of routes whenever container or network events relevant to routing occur. +// Reconnects with backoff on stream errors (e.g. daemon restart). +func (w *dockerWatcherImpl) streamEvents(ctx context.Context) { + backoff := time.Second + const maxBackoff = 30 * time.Second + + for { + if ctx.Err() != nil { + logrus.Debug("Stopping Docker monitoring") + return + } + + eventFilters := filters.NewArgs( + filters.Arg("type", string(events.ContainerEventType)), + filters.Arg("type", string(events.NetworkEventType)), + filters.Arg("event", string(events.ActionStart)), + filters.Arg("event", string(events.ActionUnPause)), + filters.Arg("event", string(events.ActionStop)), + filters.Arg("event", string(events.ActionDie)), + filters.Arg("event", string(events.ActionPause)), + filters.Arg("event", string(events.ActionDestroy)), + filters.Arg("event", string(events.ActionRename)), + filters.Arg("event", string(events.ActionConnect)), + filters.Arg("event", string(events.ActionDisconnect)), + ) + + eventCh, errCh := w.client.Events(ctx, events.ListOptions{Filters: eventFilters}) + + // Resync after (re)connecting in case we missed events while disconnected + if err := w.monitorContainers(ctx); err != nil { + logrus.WithError(err).Error("Docker resync failed") + } else { + backoff = time.Second + } + + loop: + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-eventCh: + if !ok { + break loop + } + if err := w.applyEvent(ctx, ev); err != nil { + logrus.WithError(err).Error("Docker event handling failed") + } + case err, ok := <-errCh: + if !ok { + break loop + } + if ctx.Err() != nil { + return + } + logrus.WithError(err).Warn("Docker event stream error, reconnecting") + break loop + } + } + + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + } + if backoff < maxBackoff { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + func (w *dockerWatcherImpl) listContainers(ctx context.Context) ([]*routableContainer, error) { containers, err := w.client.ContainerList(ctx, container.ListOptions{All: true}) if err != nil { diff --git a/server/docker_swarm.go b/server/docker_swarm.go index 3e246f0..be611fd 100644 --- a/server/docker_swarm.go +++ b/server/docker_swarm.go @@ -10,6 +10,7 @@ import ( "time" dockertypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/swarm" @@ -19,23 +20,24 @@ import ( "github.com/sirupsen/logrus" ) -func NewDockerSwarmWatcher(socket string, timeout time.Duration, refreshInterval time.Duration, autoScaleUp bool, autoScaleDown bool, dockerApiVersion string) IDockerWatcher { +func NewDockerSwarmWatcher(socket string, timeout time.Duration, autoScaleUp bool, autoScaleDown bool, dockerApiVersion string) IDockerWatcher { return &dockerSwarmWatcherImpl{ config: dockerWatcherConfig{ - socket: socket, - timeout: timeout, - refreshInterval: refreshInterval, - autoScaleUp: autoScaleUp, - autoScaleDown: autoScaleDown, - apiVersion: dockerApiVersion, + socket: socket, + timeout: timeout, + autoScaleUp: autoScaleUp, + autoScaleDown: autoScaleDown, + apiVersion: dockerApiVersion, }, } } type dockerSwarmWatcherImpl struct { sync.RWMutex - config dockerWatcherConfig - client *client.Client + config dockerWatcherConfig + client *client.Client + serviceMap map[string]*routableService + monitorLock sync.Mutex } func (w *dockerSwarmWatcherImpl) makeWakerFunc(_ *routableService) WakerFunc { @@ -75,85 +77,133 @@ func (w *dockerSwarmWatcherImpl) Start(ctx context.Context) error { return err } - ticker := time.NewTicker(w.config.refreshInterval) - serviceMap := map[string]*routableService{} + w.serviceMap = map[string]*routableService{} - logrus.Trace("Performing initial listing of Docker containers") - initialServices, err := w.listServices(ctx) - if err != nil { + logrus.Trace("Performing initial listing of Docker swarm services") + if err := w.reconcileServices(ctx); err != nil { return err } - for _, s := range initialServices { - serviceMap[s.externalServiceName] = s - wakerFunc := w.makeWakerFunc(s) - sleeperFunc := w.makeSleeperFunc(s) - if s.externalServiceName != "" { - Routes.CreateMapping(s.externalServiceName, s.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") - } else { - Routes.SetDefaultRoute(s.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") - } - } - - go func() { - for { - select { - case <-ticker.C: - services, err := w.listServices(ctx) - if err != nil { - logrus.WithError(err).Error("Docker failed to list services") - continue - } - - visited := map[string]struct{}{} - for _, rs := range services { - if oldRs, ok := serviceMap[rs.externalServiceName]; !ok { - serviceMap[rs.externalServiceName] = rs - logrus.WithField("routableService", rs).Debug("ADD") - wakerFunc := w.makeWakerFunc(rs) - sleeperFunc := w.makeSleeperFunc(rs) - if rs.externalServiceName != "" { - Routes.CreateMapping(rs.externalServiceName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") - } else { - Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") - } - } else if oldRs.containerEndpoint != rs.containerEndpoint { - serviceMap[rs.externalServiceName] = rs - wakerFunc := w.makeWakerFunc(rs) - sleeperFunc := w.makeSleeperFunc(rs) - if rs.externalServiceName != "" { - Routes.DeleteMapping(rs.externalServiceName) - Routes.CreateMapping(rs.externalServiceName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") - } else { - Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") - } - logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE") - } - visited[rs.externalServiceName] = struct{}{} - } - for _, rs := range serviceMap { - if _, ok := visited[rs.externalServiceName]; !ok { - delete(serviceMap, rs.externalServiceName) - if rs.externalServiceName != "" { - Routes.DeleteMapping(rs.externalServiceName) - } else { - Routes.SetDefaultRoute("", "", nil, nil, "", "") - } - logrus.WithField("routableService", rs).Debug("DELETE") - } - } - - case <-ctx.Done(): - ticker.Stop() - return - } - } - }() + go w.streamEvents(ctx) logrus.Info("Monitoring Docker Swarm for Minecraft services") return nil } +func (w *dockerSwarmWatcherImpl) reconcileServices(ctx context.Context) error { + w.monitorLock.Lock() + defer w.monitorLock.Unlock() + + services, err := w.listServices(ctx) + if err != nil { + logrus.WithError(err).Error("Docker failed to list services") + return err + } + + visited := map[string]struct{}{} + for _, rs := range services { + if oldRs, ok := w.serviceMap[rs.externalServiceName]; !ok { + w.serviceMap[rs.externalServiceName] = rs + logrus.WithField("routableService", rs).Debug("ADD") + wakerFunc := w.makeWakerFunc(rs) + sleeperFunc := w.makeSleeperFunc(rs) + if rs.externalServiceName != "" { + Routes.CreateMapping(rs.externalServiceName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") + } else { + Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") + } + } else if oldRs.containerEndpoint != rs.containerEndpoint { + w.serviceMap[rs.externalServiceName] = rs + wakerFunc := w.makeWakerFunc(rs) + sleeperFunc := w.makeSleeperFunc(rs) + if rs.externalServiceName != "" { + Routes.DeleteMapping(rs.externalServiceName) + Routes.CreateMapping(rs.externalServiceName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") + } else { + Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, "", "") + } + logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE") + } + visited[rs.externalServiceName] = struct{}{} + } + for _, rs := range w.serviceMap { + if _, ok := visited[rs.externalServiceName]; !ok { + delete(w.serviceMap, rs.externalServiceName) + if rs.externalServiceName != "" { + Routes.DeleteMapping(rs.externalServiceName) + } else { + Routes.SetDefaultRoute("", "", nil, nil, "", "") + } + logrus.WithField("routableService", rs).Debug("DELETE") + } + } + return nil +} + +func (w *dockerSwarmWatcherImpl) streamEvents(ctx context.Context) { + backoff := time.Second + const maxBackoff = 30 * time.Second + + for { + if ctx.Err() != nil { + logrus.Debug("Stopping Docker Swarm monitoring") + return + } + + eventFilters := filters.NewArgs( + filters.Arg("type", string(events.ServiceEventType)), + filters.Arg("event", string(events.ActionCreate)), + filters.Arg("event", string(events.ActionUpdate)), + filters.Arg("event", string(events.ActionRemove)), + ) + + eventCh, errCh := w.client.Events(ctx, events.ListOptions{Filters: eventFilters}) + + if err := w.reconcileServices(ctx); err != nil { + logrus.WithError(err).Error("Docker Swarm resync failed") + } else { + backoff = time.Second + } + + loop: + for { + select { + case <-ctx.Done(): + return + case ev, ok := <-eventCh: + if !ok { + break loop + } + logrus.WithFields(logrus.Fields{"type": ev.Type, "action": ev.Action, "id": ev.Actor.ID}).Trace("Docker Swarm event") + if err := w.reconcileServices(ctx); err != nil { + logrus.WithError(err).Error("Docker Swarm reconciliation failed") + } + case err, ok := <-errCh: + if !ok { + break loop + } + if ctx.Err() != nil { + return + } + logrus.WithError(err).Warn("Docker Swarm event stream error, reconnecting") + break loop + } + } + + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + } + if backoff < maxBackoff { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} + func (w *dockerSwarmWatcherImpl) listServices(ctx context.Context) ([]*routableService, error) { services, err := w.client.ServiceList(ctx, dockertypes.ServiceListOptions{}) if err != nil { diff --git a/server/server.go b/server/server.go index 5902514..29fbb52 100644 --- a/server/server.go +++ b/server/server.go @@ -140,9 +140,14 @@ func NewServer(ctx context.Context, config *Config) (*Server, error) { routeWatchers = append(routeWatchers, k8sWatcher) } + if config.DockerRefreshInterval != 0 { + logrus.WithField("value", config.DockerRefreshInterval). + Warn("--docker-refresh-interval is deprecated and ignored; Docker discovery is now event-driven") + } + // TODO convert to RouteFinder if config.InDocker { - watcher := NewDockerWatcher(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down, config.DockerApiVersion) + watcher := NewDockerWatcher(config.DockerSocket, config.DockerTimeout, config.AutoScale.Up, config.AutoScale.Down, config.DockerApiVersion) err = watcher.Start(ctx) if err != nil { return nil, fmt.Errorf("could not start docker integration: %w", err) @@ -151,7 +156,7 @@ func NewServer(ctx context.Context, config *Config) (*Server, error) { // TODO convert to RouteFinder if config.InDockerSwarm { - watcher := NewDockerSwarmWatcher(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down, config.DockerApiVersion) + watcher := NewDockerSwarmWatcher(config.DockerSocket, config.DockerTimeout, config.AutoScale.Up, config.AutoScale.Down, config.DockerApiVersion) err = watcher.Start(ctx) if err != nil { return nil, fmt.Errorf("could not start docker swarm integration: %w", err)