This commit is contained in:
+265
-96
@@ -9,7 +9,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cerrdefs "github.com/containerd/errdefs"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/events"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -30,12 +33,11 @@ const (
|
||||
)
|
||||
|
||||
type dockerWatcherConfig struct {
|
||||
autoScaleUp bool
|
||||
autoScaleDown bool
|
||||
socket string
|
||||
timeout time.Duration
|
||||
refreshInterval time.Duration
|
||||
apiVersion string
|
||||
autoScaleUp bool
|
||||
autoScaleDown bool
|
||||
socket string
|
||||
timeout time.Duration
|
||||
apiVersion string
|
||||
}
|
||||
|
||||
func (c *dockerWatcherConfig) apiVersionOpt() client.Opt {
|
||||
@@ -48,15 +50,14 @@ func (c *dockerWatcherConfig) apiVersionOpt() client.Opt {
|
||||
}
|
||||
}
|
||||
|
||||
func NewDockerWatcher(socket string, timeout time.Duration, refreshInterval time.Duration, autoScaleUp bool, autoScaleDown bool, dockerApiVersion string) IDockerWatcher {
|
||||
func NewDockerWatcher(socket string, timeout time.Duration, autoScaleUp bool, autoScaleDown bool, dockerApiVersion string) IDockerWatcher {
|
||||
return &dockerWatcherImpl{
|
||||
config: dockerWatcherConfig{
|
||||
socket: socket,
|
||||
timeout: timeout,
|
||||
refreshInterval: refreshInterval,
|
||||
autoScaleUp: autoScaleUp,
|
||||
autoScaleDown: autoScaleDown,
|
||||
apiVersion: dockerApiVersion,
|
||||
socket: socket,
|
||||
timeout: timeout,
|
||||
autoScaleUp: autoScaleUp,
|
||||
autoScaleDown: autoScaleDown,
|
||||
apiVersion: dockerApiVersion,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -111,12 +112,7 @@ func (w *dockerWatcherImpl) makeWakerFunc(rc *routableContainer) WakerFunc {
|
||||
}
|
||||
endpoint := net.JoinHostPort(data.ip, strconv.Itoa(int(data.port)))
|
||||
|
||||
// Update the route mappings
|
||||
err = w.monitorContainers(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Docker monitoring failed")
|
||||
return "", err
|
||||
}
|
||||
// Route table updates via Docker `start`/`network connect` events.
|
||||
|
||||
// Wait until the container is reachable
|
||||
deadline := time.Now().Add(60 * time.Second)
|
||||
@@ -164,15 +160,15 @@ func (w *dockerWatcherImpl) makeSleeperFunc(rc *routableContainer) SleeperFunc {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = w.monitorContainers(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Docker monitoring failed")
|
||||
return err
|
||||
}
|
||||
// Route table updates via Docker `die`/`stop`/`network disconnect` events.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// monitorContainers does a full re-list of Docker containers and reconciles
|
||||
// the route table against it. Used for initial sync at startup and for
|
||||
// resync after the event stream reconnects (to catch any events missed
|
||||
// during disconnect).
|
||||
func (w *dockerWatcherImpl) monitorContainers(ctx context.Context) error {
|
||||
w.monitorLock.Lock()
|
||||
defer w.monitorLock.Unlock()
|
||||
@@ -184,51 +180,178 @@ func (w *dockerWatcherImpl) monitorContainers(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
visited := map[string]struct{}{}
|
||||
for _, rs := range containers {
|
||||
if oldRs, ok := w.containerMap[rs.externalContainerName]; !ok {
|
||||
w.containerMap[rs.externalContainerName] = rs
|
||||
logrus.WithField("routableContainer", rs).Debug("ADD")
|
||||
wakerFunc := w.makeWakerFunc(rs)
|
||||
sleeperFunc := w.makeSleeperFunc(rs)
|
||||
if rs.externalContainerName != "" {
|
||||
Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD)
|
||||
} else {
|
||||
Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD)
|
||||
}
|
||||
} else if oldRs.containerEndpoint != rs.containerEndpoint ||
|
||||
oldRs.containerID != rs.containerID ||
|
||||
oldRs.autoScaleUp != rs.autoScaleUp ||
|
||||
oldRs.autoScaleDown != rs.autoScaleDown ||
|
||||
oldRs.autoScaleAsleepMOTD != rs.autoScaleAsleepMOTD ||
|
||||
oldRs.autoScaleLoadingMOTD != rs.autoScaleLoadingMOTD {
|
||||
w.containerMap[rs.externalContainerName] = rs
|
||||
wakerFunc := w.makeWakerFunc(rs)
|
||||
sleeperFunc := w.makeSleeperFunc(rs)
|
||||
if rs.externalContainerName != "" {
|
||||
Routes.DeleteMapping(rs.externalContainerName)
|
||||
Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD)
|
||||
} else {
|
||||
Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD)
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE")
|
||||
}
|
||||
visited[rs.externalContainerName] = struct{}{}
|
||||
byID := map[string][]*routableContainer{}
|
||||
for _, rc := range containers {
|
||||
byID[rc.containerID] = append(byID[rc.containerID], rc)
|
||||
}
|
||||
for _, rs := range w.containerMap {
|
||||
if _, ok := visited[rs.externalContainerName]; !ok {
|
||||
delete(w.containerMap, rs.externalContainerName)
|
||||
if rs.externalContainerName != "" {
|
||||
Routes.DeleteMapping(rs.externalContainerName)
|
||||
} else {
|
||||
Routes.SetDefaultRoute("", "", nil, nil, "", "")
|
||||
}
|
||||
logrus.WithField("routableContainer", rs).Debug("DELETE")
|
||||
|
||||
for id, desired := range byID {
|
||||
w.applyContainerRoutesLocked(id, desired)
|
||||
}
|
||||
|
||||
// Remove entries whose container is no longer present at all
|
||||
for name, rc := range w.containerMap {
|
||||
if _, present := byID[rc.containerID]; present {
|
||||
continue
|
||||
}
|
||||
delete(w.containerMap, name)
|
||||
if name != "" {
|
||||
Routes.DeleteMapping(name)
|
||||
} else {
|
||||
Routes.SetDefaultRoute("", "", nil, nil, "", "")
|
||||
}
|
||||
logrus.WithField("routableContainer", rc).Debug("DELETE")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyEvent reacts to a single Docker event by reconciling only the routes
|
||||
// belonging to the affected container — no full re-list.
|
||||
func (w *dockerWatcherImpl) applyEvent(ctx context.Context, ev events.Message) error {
|
||||
containerID := ev.Actor.ID
|
||||
if ev.Type == events.NetworkEventType {
|
||||
containerID = ev.Actor.Attributes["container"]
|
||||
}
|
||||
if containerID == "" {
|
||||
logrus.WithField("event", ev).Warn("network event missing container attribute, skipping")
|
||||
return nil
|
||||
}
|
||||
|
||||
var desired []*routableContainer
|
||||
if !(ev.Type == events.ContainerEventType && ev.Action == events.ActionDestroy) {
|
||||
got, err := w.containersForID(ctx, containerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
desired = got
|
||||
}
|
||||
|
||||
w.monitorLock.Lock()
|
||||
defer w.monitorLock.Unlock()
|
||||
|
||||
// Only trace events that affect a routed container — either one we already
|
||||
// track or one becoming routable now. Filters out unrelated daemon noise.
|
||||
relevant := len(desired) > 0
|
||||
if !relevant {
|
||||
for _, rc := range w.containerMap {
|
||||
if rc.containerID == containerID {
|
||||
relevant = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if relevant {
|
||||
logrus.WithFields(logrus.Fields{"type": ev.Type, "action": ev.Action, "id": containerID}).Trace("Docker event")
|
||||
}
|
||||
|
||||
w.applyContainerRoutesLocked(containerID, desired)
|
||||
return nil
|
||||
}
|
||||
|
||||
// containersForID inspects a single container and returns the routableContainers
|
||||
// it should produce. Returns nil if the container is gone or not routable.
|
||||
func (w *dockerWatcherImpl) containersForID(ctx context.Context, containerID string) ([]*routableContainer, error) {
|
||||
inspect, err := w.client.ContainerInspect(ctx, containerID)
|
||||
if err != nil {
|
||||
if cerrdefs.IsNotFound(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
data, ok := w.parseContainerData(&inspect)
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
endpoint := ""
|
||||
if !data.notRunning {
|
||||
endpoint = fmt.Sprintf("%s:%d", data.ip, data.port)
|
||||
}
|
||||
var result []*routableContainer
|
||||
for _, host := range data.hosts {
|
||||
result = append(result, &routableContainer{
|
||||
containerEndpoint: endpoint,
|
||||
externalContainerName: host,
|
||||
containerID: containerID,
|
||||
autoScaleUp: data.autoScaleUp,
|
||||
autoScaleDown: data.autoScaleDown,
|
||||
autoScaleAsleepMOTD: data.autoScaleAsleepMOTD,
|
||||
autoScaleLoadingMOTD: data.autoScaleLoadingMOTD,
|
||||
})
|
||||
}
|
||||
if data.def != nil && *data.def {
|
||||
result = append(result, &routableContainer{
|
||||
containerEndpoint: endpoint,
|
||||
externalContainerName: "",
|
||||
containerID: containerID,
|
||||
autoScaleUp: data.autoScaleUp,
|
||||
autoScaleDown: data.autoScaleDown,
|
||||
autoScaleAsleepMOTD: data.autoScaleAsleepMOTD,
|
||||
autoScaleLoadingMOTD: data.autoScaleLoadingMOTD,
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// applyContainerRoutesLocked reconciles the routes for a single containerID
|
||||
// against the desired set. Caller must hold monitorLock.
|
||||
func (w *dockerWatcherImpl) applyContainerRoutesLocked(containerID string, desired []*routableContainer) {
|
||||
desiredByName := map[string]*routableContainer{}
|
||||
for _, rc := range desired {
|
||||
desiredByName[rc.externalContainerName] = rc
|
||||
}
|
||||
|
||||
// Drop entries previously owned by this container that are no longer desired
|
||||
for name, rc := range w.containerMap {
|
||||
if rc.containerID != containerID {
|
||||
continue
|
||||
}
|
||||
if _, keep := desiredByName[name]; keep {
|
||||
continue
|
||||
}
|
||||
delete(w.containerMap, name)
|
||||
if name != "" {
|
||||
Routes.DeleteMapping(name)
|
||||
} else {
|
||||
Routes.SetDefaultRoute("", "", nil, nil, "", "")
|
||||
}
|
||||
logrus.WithField("routableContainer", rc).Debug("DELETE")
|
||||
}
|
||||
|
||||
for _, rs := range desired {
|
||||
oldRs, exists := w.containerMap[rs.externalContainerName]
|
||||
if !exists {
|
||||
w.containerMap[rs.externalContainerName] = rs
|
||||
wakerFunc := w.makeWakerFunc(rs)
|
||||
sleeperFunc := w.makeSleeperFunc(rs)
|
||||
if rs.externalContainerName != "" {
|
||||
Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD)
|
||||
} else {
|
||||
Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD)
|
||||
}
|
||||
logrus.WithField("routableContainer", rs).Debug("ADD")
|
||||
continue
|
||||
}
|
||||
if oldRs.containerEndpoint == rs.containerEndpoint &&
|
||||
oldRs.containerID == rs.containerID &&
|
||||
oldRs.autoScaleUp == rs.autoScaleUp &&
|
||||
oldRs.autoScaleDown == rs.autoScaleDown &&
|
||||
oldRs.autoScaleAsleepMOTD == rs.autoScaleAsleepMOTD &&
|
||||
oldRs.autoScaleLoadingMOTD == rs.autoScaleLoadingMOTD {
|
||||
continue
|
||||
}
|
||||
w.containerMap[rs.externalContainerName] = rs
|
||||
wakerFunc := w.makeWakerFunc(rs)
|
||||
sleeperFunc := w.makeSleeperFunc(rs)
|
||||
if rs.externalContainerName != "" {
|
||||
Routes.DeleteMapping(rs.externalContainerName)
|
||||
Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD)
|
||||
} else {
|
||||
Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD, rs.autoScaleLoadingMOTD)
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE")
|
||||
}
|
||||
}
|
||||
|
||||
func (w *dockerWatcherImpl) Start(ctx context.Context) error {
|
||||
var err error
|
||||
|
||||
@@ -248,49 +371,95 @@ func (w *dockerWatcherImpl) Start(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: replace all this with events listening
|
||||
ticker := time.NewTicker(w.config.refreshInterval)
|
||||
w.containerMap = map[string]*routableContainer{}
|
||||
|
||||
logrus.Trace("Performing initial listing of Docker containers")
|
||||
initialContainers, err := w.listContainers(ctx)
|
||||
if err != nil {
|
||||
if err := w.monitorContainers(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.containerMap = map[string]*routableContainer{}
|
||||
for _, c := range initialContainers {
|
||||
w.containerMap[c.externalContainerName] = c
|
||||
wakerFunc := w.makeWakerFunc(c)
|
||||
sleeperFunc := w.makeSleeperFunc(c)
|
||||
if c.externalContainerName != "" {
|
||||
Routes.CreateMapping(c.externalContainerName, c.containerEndpoint, "", wakerFunc, sleeperFunc, c.autoScaleAsleepMOTD, c.autoScaleLoadingMOTD)
|
||||
} else {
|
||||
Routes.SetDefaultRoute(c.containerEndpoint, "", wakerFunc, sleeperFunc, c.autoScaleAsleepMOTD, c.autoScaleLoadingMOTD)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err := w.monitorContainers(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Docker monitoring failed")
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
logrus.Debug("Stopping Docker monitoring")
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
// streamEvents will resync on (re)connect and otherwise apply incremental
|
||||
// updates from the Docker event stream — no periodic polling.
|
||||
go w.streamEvents(ctx)
|
||||
|
||||
logrus.Info("Monitoring Docker for Minecraft containers")
|
||||
return nil
|
||||
}
|
||||
|
||||
// streamEvents subscribes to the Docker event stream and triggers reconciliation
|
||||
// of routes whenever container or network events relevant to routing occur.
|
||||
// Reconnects with backoff on stream errors (e.g. daemon restart).
|
||||
func (w *dockerWatcherImpl) streamEvents(ctx context.Context) {
|
||||
backoff := time.Second
|
||||
const maxBackoff = 30 * time.Second
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
logrus.Debug("Stopping Docker monitoring")
|
||||
return
|
||||
}
|
||||
|
||||
eventFilters := filters.NewArgs(
|
||||
filters.Arg("type", string(events.ContainerEventType)),
|
||||
filters.Arg("type", string(events.NetworkEventType)),
|
||||
filters.Arg("event", string(events.ActionStart)),
|
||||
filters.Arg("event", string(events.ActionUnPause)),
|
||||
filters.Arg("event", string(events.ActionStop)),
|
||||
filters.Arg("event", string(events.ActionDie)),
|
||||
filters.Arg("event", string(events.ActionPause)),
|
||||
filters.Arg("event", string(events.ActionDestroy)),
|
||||
filters.Arg("event", string(events.ActionRename)),
|
||||
filters.Arg("event", string(events.ActionConnect)),
|
||||
filters.Arg("event", string(events.ActionDisconnect)),
|
||||
)
|
||||
|
||||
eventCh, errCh := w.client.Events(ctx, events.ListOptions{Filters: eventFilters})
|
||||
|
||||
// Resync after (re)connecting in case we missed events while disconnected
|
||||
if err := w.monitorContainers(ctx); err != nil {
|
||||
logrus.WithError(err).Error("Docker resync failed")
|
||||
} else {
|
||||
backoff = time.Second
|
||||
}
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ev, ok := <-eventCh:
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
if err := w.applyEvent(ctx, ev); err != nil {
|
||||
logrus.WithError(err).Error("Docker event handling failed")
|
||||
}
|
||||
case err, ok := <-errCh:
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
logrus.WithError(err).Warn("Docker event stream error, reconnecting")
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *dockerWatcherImpl) listContainers(ctx context.Context) ([]*routableContainer, error) {
|
||||
containers, err := w.client.ContainerList(ctx, container.ListOptions{All: true})
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user