Corrected docker/swarm discovery from previous refactoring (#446)
This commit is contained in:
+5
-13
@@ -15,8 +15,7 @@ import (
|
||||
)
|
||||
|
||||
type IDockerWatcher interface {
|
||||
Start(socket string, timeoutSeconds int, refreshIntervalSeconds int, autoScaleUp bool, autoScaleDown bool) error
|
||||
Stop()
|
||||
Start(ctx context.Context, socket string, timeoutSeconds int, refreshIntervalSeconds int, autoScaleUp bool, autoScaleDown bool) error
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -34,7 +33,6 @@ type dockerWatcherImpl struct {
|
||||
autoScaleUp bool
|
||||
autoScaleDown bool
|
||||
client *client.Client
|
||||
contextCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (w *dockerWatcherImpl) makeWakerFunc(_ *routableContainer) ScalerFunc {
|
||||
@@ -57,7 +55,7 @@ func (w *dockerWatcherImpl) makeSleeperFunc(_ *routableContainer) ScalerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *dockerWatcherImpl) Start(socket string, timeoutSeconds int, refreshIntervalSeconds int, autoScaleUp bool, autoScaleDown bool) error {
|
||||
func (w *dockerWatcherImpl) Start(ctx context.Context, socket string, timeoutSeconds int, refreshIntervalSeconds int, autoScaleUp bool, autoScaleDown bool) error {
|
||||
var err error
|
||||
|
||||
w.autoScaleUp = autoScaleUp
|
||||
@@ -83,9 +81,7 @@ func (w *dockerWatcherImpl) Start(socket string, timeoutSeconds int, refreshInte
|
||||
ticker := time.NewTicker(refreshInterval)
|
||||
containerMap := map[string]*routableContainer{}
|
||||
|
||||
var ctx context.Context
|
||||
ctx, w.contextCancel = context.WithCancel(context.Background())
|
||||
|
||||
logrus.Trace("Performing initial listing of Docker containers")
|
||||
initialContainers, err := w.listContainers(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -104,6 +100,7 @@ func (w *dockerWatcherImpl) Start(socket string, timeoutSeconds int, refreshInte
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
logrus.Trace("Listing Docker containers")
|
||||
containers, err := w.listContainers(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Docker failed to list containers")
|
||||
@@ -145,6 +142,7 @@ func (w *dockerWatcherImpl) Start(socket string, timeoutSeconds int, refreshInte
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
logrus.Debug("Stopping Docker monitoring")
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
@@ -303,12 +301,6 @@ func (w *dockerWatcherImpl) parseContainerData(container *dockertypes.Container)
|
||||
return
|
||||
}
|
||||
|
||||
func (w *dockerWatcherImpl) Stop() {
|
||||
if w.contextCancel != nil {
|
||||
w.contextCancel()
|
||||
}
|
||||
}
|
||||
|
||||
type routableContainer struct {
|
||||
externalContainerName string
|
||||
containerEndpoint string
|
||||
|
||||
+3
-12
@@ -26,7 +26,6 @@ type dockerSwarmWatcherImpl struct {
|
||||
autoScaleUp bool
|
||||
autoScaleDown bool
|
||||
client *client.Client
|
||||
contextCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (w *dockerSwarmWatcherImpl) makeWakerFunc(_ *routableService) ScalerFunc {
|
||||
@@ -49,7 +48,7 @@ func (w *dockerSwarmWatcherImpl) makeSleeperFunc(_ *routableService) ScalerFunc
|
||||
}
|
||||
}
|
||||
|
||||
func (w *dockerSwarmWatcherImpl) Start(socket string, timeoutSeconds int, refreshIntervalSeconds int, autoScaleUp bool, autoScaleDown bool) error {
|
||||
func (w *dockerSwarmWatcherImpl) Start(ctx context.Context, socket string, timeoutSeconds int, refreshIntervalSeconds int, autoScaleUp bool, autoScaleDown bool) error {
|
||||
var err error
|
||||
|
||||
w.autoScaleUp = autoScaleUp
|
||||
@@ -75,9 +74,7 @@ func (w *dockerSwarmWatcherImpl) Start(socket string, timeoutSeconds int, refres
|
||||
ticker := time.NewTicker(refreshInterval)
|
||||
serviceMap := map[string]*routableService{}
|
||||
|
||||
var ctx context.Context
|
||||
ctx, w.contextCancel = context.WithCancel(context.Background())
|
||||
|
||||
logrus.Trace("Performing initial listing of Docker containers")
|
||||
initialServices, err := w.listServices(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -99,7 +96,7 @@ func (w *dockerSwarmWatcherImpl) Start(socket string, timeoutSeconds int, refres
|
||||
services, err := w.listServices(ctx)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Docker failed to list services")
|
||||
return
|
||||
continue
|
||||
}
|
||||
|
||||
visited := map[string]struct{}{}
|
||||
@@ -332,9 +329,3 @@ func (w *dockerSwarmWatcherImpl) parseServiceData(service *swarm.Service, networ
|
||||
ok = true
|
||||
return
|
||||
}
|
||||
|
||||
func (w *dockerSwarmWatcherImpl) Stop() {
|
||||
if w.contextCancel != nil {
|
||||
w.contextCancel()
|
||||
}
|
||||
}
|
||||
|
||||
+2
-6
@@ -142,21 +142,17 @@ func NewServer(ctx context.Context, config *Config) (*Server, error) {
|
||||
|
||||
// TODO convert to RouteFinder
|
||||
if config.InDocker {
|
||||
err = DockerWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down)
|
||||
err = DockerWatcher.Start(ctx, config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not start docker integration: %w", err)
|
||||
} else {
|
||||
defer DockerWatcher.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO convert to RouteFinder
|
||||
if config.InDockerSwarm {
|
||||
err = DockerSwarmWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down)
|
||||
err = DockerSwarmWatcher.Start(ctx, config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not start docker swarm integration: %w", err)
|
||||
} else {
|
||||
defer DockerSwarmWatcher.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user