Fix race condition during first connection (#509)
This commit is contained in:
+70
-50
@@ -62,8 +62,10 @@ func NewDockerWatcher(socket string, timeoutSeconds int, refreshIntervalSeconds
|
|||||||
|
|
||||||
type dockerWatcherImpl struct {
|
type dockerWatcherImpl struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
config dockerWatcherConfig
|
config dockerWatcherConfig
|
||||||
client *client.Client
|
client *client.Client
|
||||||
|
containerMap map[string]*routableContainer
|
||||||
|
monitorLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *dockerWatcherImpl) makeWakerFunc(rc *routableContainer) WakerFunc {
|
func (w *dockerWatcherImpl) makeWakerFunc(rc *routableContainer) WakerFunc {
|
||||||
@@ -108,6 +110,13 @@ func (w *dockerWatcherImpl) makeWakerFunc(rc *routableContainer) WakerFunc {
|
|||||||
}
|
}
|
||||||
endpoint := net.JoinHostPort(data.ip, strconv.Itoa(int(data.port)))
|
endpoint := net.JoinHostPort(data.ip, strconv.Itoa(int(data.port)))
|
||||||
|
|
||||||
|
// Update the route mappings
|
||||||
|
err = w.monitorContainers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("Docker monitoring failed")
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
// Wait until the container is reachable
|
// Wait until the container is reachable
|
||||||
deadline := time.Now().Add(60 * time.Second)
|
deadline := time.Now().Add(60 * time.Second)
|
||||||
for {
|
for {
|
||||||
@@ -158,6 +167,61 @@ func (w *dockerWatcherImpl) makeSleeperFunc(rc *routableContainer) SleeperFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *dockerWatcherImpl) monitorContainers(ctx context.Context) error {
|
||||||
|
w.monitorLock.Lock()
|
||||||
|
defer w.monitorLock.Unlock()
|
||||||
|
|
||||||
|
logrus.Trace("Listing Docker containers")
|
||||||
|
containers, err := w.listContainers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("Docker failed to list containers")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
visited := map[string]struct{}{}
|
||||||
|
for _, rs := range containers {
|
||||||
|
if oldRs, ok := w.containerMap[rs.externalContainerName]; !ok {
|
||||||
|
w.containerMap[rs.externalContainerName] = rs
|
||||||
|
logrus.WithField("routableContainer", rs).Debug("ADD")
|
||||||
|
wakerFunc := w.makeWakerFunc(rs)
|
||||||
|
sleeperFunc := w.makeSleeperFunc(rs)
|
||||||
|
if rs.externalContainerName != "" {
|
||||||
|
Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD)
|
||||||
|
} else {
|
||||||
|
Routes.SetDefaultRoute(rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD)
|
||||||
|
}
|
||||||
|
} else if oldRs.containerEndpoint != rs.containerEndpoint ||
|
||||||
|
oldRs.containerID != rs.containerID ||
|
||||||
|
oldRs.autoScaleUp != rs.autoScaleUp ||
|
||||||
|
oldRs.autoScaleDown != rs.autoScaleDown ||
|
||||||
|
oldRs.autoScaleAsleepMOTD != rs.autoScaleAsleepMOTD {
|
||||||
|
w.containerMap[rs.externalContainerName] = rs
|
||||||
|
wakerFunc := w.makeWakerFunc(rs)
|
||||||
|
sleeperFunc := w.makeSleeperFunc(rs)
|
||||||
|
if rs.externalContainerName != "" {
|
||||||
|
Routes.DeleteMapping(rs.externalContainerName)
|
||||||
|
Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD)
|
||||||
|
} else {
|
||||||
|
Routes.SetDefaultRoute(rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD)
|
||||||
|
}
|
||||||
|
logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE")
|
||||||
|
}
|
||||||
|
visited[rs.externalContainerName] = struct{}{}
|
||||||
|
}
|
||||||
|
for _, rs := range w.containerMap {
|
||||||
|
if _, ok := visited[rs.externalContainerName]; !ok {
|
||||||
|
delete(w.containerMap, rs.externalContainerName)
|
||||||
|
if rs.externalContainerName != "" {
|
||||||
|
Routes.DeleteMapping(rs.externalContainerName)
|
||||||
|
} else {
|
||||||
|
Routes.SetDefaultRoute("", nil, nil, "")
|
||||||
|
}
|
||||||
|
logrus.WithField("routableContainer", rs).Debug("DELETE")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (w *dockerWatcherImpl) Start(ctx context.Context) error {
|
func (w *dockerWatcherImpl) Start(ctx context.Context) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@@ -186,9 +250,9 @@ func (w *dockerWatcherImpl) Start(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
containerMap := map[string]*routableContainer{}
|
w.containerMap = map[string]*routableContainer{}
|
||||||
for _, c := range initialContainers {
|
for _, c := range initialContainers {
|
||||||
containerMap[c.externalContainerName] = c
|
w.containerMap[c.externalContainerName] = c
|
||||||
wakerFunc := w.makeWakerFunc(c)
|
wakerFunc := w.makeWakerFunc(c)
|
||||||
sleeperFunc := w.makeSleeperFunc(c)
|
sleeperFunc := w.makeSleeperFunc(c)
|
||||||
if c.externalContainerName != "" {
|
if c.externalContainerName != "" {
|
||||||
@@ -202,55 +266,11 @@ func (w *dockerWatcherImpl) Start(ctx context.Context) error {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
logrus.Trace("Listing Docker containers")
|
err := w.monitorContainers(ctx)
|
||||||
containers, err := w.listContainers(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("Docker failed to list containers")
|
logrus.WithError(err).Error("Docker monitoring failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
visited := map[string]struct{}{}
|
|
||||||
for _, rs := range containers {
|
|
||||||
if oldRs, ok := containerMap[rs.externalContainerName]; !ok {
|
|
||||||
containerMap[rs.externalContainerName] = rs
|
|
||||||
logrus.WithField("routableContainer", rs).Debug("ADD")
|
|
||||||
wakerFunc := w.makeWakerFunc(rs)
|
|
||||||
sleeperFunc := w.makeSleeperFunc(rs)
|
|
||||||
if rs.externalContainerName != "" {
|
|
||||||
Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD)
|
|
||||||
} else {
|
|
||||||
Routes.SetDefaultRoute(rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD)
|
|
||||||
}
|
|
||||||
} else if oldRs.containerEndpoint != rs.containerEndpoint ||
|
|
||||||
oldRs.containerID != rs.containerID ||
|
|
||||||
oldRs.autoScaleUp != rs.autoScaleUp ||
|
|
||||||
oldRs.autoScaleDown != rs.autoScaleDown ||
|
|
||||||
oldRs.autoScaleAsleepMOTD != rs.autoScaleAsleepMOTD {
|
|
||||||
containerMap[rs.externalContainerName] = rs
|
|
||||||
wakerFunc := w.makeWakerFunc(rs)
|
|
||||||
sleeperFunc := w.makeSleeperFunc(rs)
|
|
||||||
if rs.externalContainerName != "" {
|
|
||||||
Routes.DeleteMapping(rs.externalContainerName)
|
|
||||||
Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD)
|
|
||||||
} else {
|
|
||||||
Routes.SetDefaultRoute(rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD)
|
|
||||||
}
|
|
||||||
logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE")
|
|
||||||
}
|
|
||||||
visited[rs.externalContainerName] = struct{}{}
|
|
||||||
}
|
|
||||||
for _, rs := range containerMap {
|
|
||||||
if _, ok := visited[rs.externalContainerName]; !ok {
|
|
||||||
delete(containerMap, rs.externalContainerName)
|
|
||||||
if rs.externalContainerName != "" {
|
|
||||||
Routes.DeleteMapping(rs.externalContainerName)
|
|
||||||
} else {
|
|
||||||
Routes.SetDefaultRoute("", nil, nil, "")
|
|
||||||
}
|
|
||||||
logrus.WithField("routableContainer", rs).Debug("DELETE")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logrus.Debug("Stopping Docker monitoring")
|
logrus.Debug("Stopping Docker monitoring")
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
|
|||||||
Reference in New Issue
Block a user