From 5c7f5d7b3656a0237dfe6cbc5affa25fb9fd87a0 Mon Sep 17 00:00:00 2001 From: Bastian Wagner Date: Tue, 10 Mar 2026 00:22:53 +0100 Subject: [PATCH] feat: show asleep or loading motd for k8s servers (#535) --- README.md | 20 ++++++++ server/connector.go | 15 ++++++ server/connector_test.go | 84 ++++++++++++++++++++++++++++++++++ server/k8s.go | 99 ++++++++++++++++++++++++++++++++-------- server/k8s_test.go | 80 ++++++++++++++++++++++++++++++++ 5 files changed, 279 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 5758bc4..5e5dbc0 100644 --- a/README.md +++ b/README.md @@ -453,6 +453,26 @@ metadata: "mc-router.itzg.me/autoScaleDown": "false" ``` +To override the MOTD shown when the server is scaled down or scaling up, you can use the following annotations on the `Service object`: +- `mc-router.itzg.me/autoScaleAsleepMOTD` +- `mc-router.itzg.me/autoScaleLoadingMOTD` + +You can also customize how long the router will wait for a scaling backend to become reachable (default: 60s): +- `mc-router.itzg.me/autoScaleWaitTimeout` (e.g. `2m`, `30s`) + +Example server with custom MOTD and timeout: +```yaml +apiVersion: v1 +kind: Service +metadata: + name: mc-forge + annotations: + "mc-router.itzg.me/externalServerName": "external.host.name" + "mc-router.itzg.me/autoScaleAsleepMOTD": "Server is sleeping" + "mc-router.itzg.me/autoScaleLoadingMOTD": "Server is loading" + "mc-router.itzg.me/autoScaleWaitTimeout": "90s" +``` + #### Using with Velocity/BungeeCord proxies When using a proxy server like Velocity or BungeeCord, you can use the `mc-router.itzg.me/proxyServerName` annotation to route client connections to the proxy while still allowing mc-router to auto-scale the backend StatefulSet. This is useful when you want to: diff --git a/server/connector.go b/server/connector.go index 5e45a2d..63f462c 100644 --- a/server/connector.go +++ b/server/connector.go @@ -659,6 +659,21 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, } } + if waker != nil && nextState == mcproto.StateStatus { + logrus.WithFields(logrus.Fields{ + "client": clientAddr, + "server": serverAddress, + "isLegacy": isLegacy, + }).Debug("Scalable backend unreachable: serving predefined status response") + + br := bufio.NewReader(frontendConn) + if isLegacy { + c.serveLegacyStatus(frontendConn, resolvedHost) + } else { + c.serveStatus(frontendConn, br, resolvedHost, clientProtocol) + } + } + return } diff --git a/server/connector_test.go b/server/connector_test.go index f428f44..0ee0464 100644 --- a/server/connector_test.go +++ b/server/connector_test.go @@ -1,11 +1,18 @@ package server import ( + "bufio" + "bytes" + "context" + "io" "net" "testing" + "time" + "github.com/itzg/mc-router/mcproto" "github.com/pires/go-proxyproto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestTrustedProxyNetworkPolicy(t *testing.T) { @@ -110,3 +117,80 @@ func TestConnectorGetLoadingMOTD(t *testing.T) { Routes.SetDefaultRoute("default:25565", "", nil, nil, "", "default loading") assert.Equal(t, "default loading", c.getLoadingMOTD("")) } + +func writeTestPacket(w io.Writer, packetID int32, payload func(w io.Writer)) error { + var b bytes.Buffer + _ = mcproto.WriteVarInt(&b, packetID) + if payload != nil { + payload(&b) + } + + var framed bytes.Buffer + _ = mcproto.WriteVarInt(&framed, int32(b.Len())) + framed.Write(b.Bytes()) + _, err := w.Write(framed.Bytes()) + return err +} + +func TestConnectorMOTDFallback(t *testing.T) { + oldRoutes := Routes + defer func() { + Routes = oldRoutes + }() + + Routes = NewRoutes() + + backendAddress := "127.0.0.1:0" + + scaleUpCalled := false + waker := func(ctx context.Context) (string, error) { + scaleUpCalled = true + return backendAddress, nil + } + + Routes.CreateMapping("mc.example.com", backendAddress, "", waker, nil, "fallback asleep", "fallback loading") + + metricsBuilder := discardMetricsBuilder{} + c := NewConnector(context.Background(), metricsBuilder.BuildConnectorMetrics(), false, false, nil) + c.UseAsleepMOTD("global asleep") + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + + go c.acceptConnections(ln, 100, 0) + + clientConn, err := net.Dial("tcp", ln.Addr().String()) + require.NoError(t, err) + defer clientConn.Close() + + err = writeTestPacket(clientConn, 0x00, func(w io.Writer) { + _ = mcproto.WriteVarInt(w, 758) + _ = mcproto.WriteString(w, "mc.example.com") + w.Write([]byte{0x63, 0xdd}) + _ = mcproto.WriteVarInt(w, 1) + }) + require.NoError(t, err) + + // 2. Send Status Request + err = writeTestPacket(clientConn, 0x00, nil) + require.NoError(t, err) + + // 3. Read Status Response + _ = clientConn.SetReadDeadline(time.Now().Add(5 * time.Second)) + reader := bufio.NewReader(clientConn) + + frame, err := mcproto.ReadFrame(reader, clientConn.RemoteAddr()) + require.NoError(t, err) + + packetReader := bytes.NewReader(frame.Payload) + packetID, err := mcproto.ReadVarInt(packetReader) + require.NoError(t, err) + assert.Equal(t, 0x00, packetID) + + jsonStr, err := mcproto.ReadString(packetReader) + require.NoError(t, err) + + assert.Contains(t, jsonStr, "fallback asleep") + assert.False(t, scaleUpCalled, "Waker should NOT be called for a status request") +} diff --git a/server/k8s.go b/server/k8s.go index 39d64f6..22aeb19 100644 --- a/server/k8s.go +++ b/server/k8s.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -22,11 +23,14 @@ import ( ) const ( - AnnotationExternalServerName = "mc-router.itzg.me/externalServerName" - AnnotationDefaultServer = "mc-router.itzg.me/defaultServer" - AnnotationAutoScaleUp = "mc-router.itzg.me/autoScaleUp" - AnnotationAutoScaleDown = "mc-router.itzg.me/autoScaleDown" - AnnotationProxyServerName = "mc-router.itzg.me/proxyServerName" + AnnotationExternalServerName = "mc-router.itzg.me/externalServerName" + AnnotationDefaultServer = "mc-router.itzg.me/defaultServer" + AnnotationAutoScaleUp = "mc-router.itzg.me/autoScaleUp" + AnnotationAutoScaleDown = "mc-router.itzg.me/autoScaleDown" + AnnotationProxyServerName = "mc-router.itzg.me/proxyServerName" + AnnotationAutoScaleAsleepMOTD = "mc-router.itzg.me/autoScaleAsleepMOTD" + AnnotationAutoScaleLoadingMOTD = "mc-router.itzg.me/autoScaleLoadingMOTD" + AnnotationAutoScaleWaitTimeout = "mc-router.itzg.me/autoScaleWaitTimeout" ) // K8sWatcher is a RouteFinder that can find routes from kubernetes services. @@ -185,9 +189,9 @@ func (w *K8sWatcher) handleUpdate(oldObj interface{}, newObj interface{}) { "new": newRoutableService, }).Debug("UPDATE") if newRoutableService.externalServiceName != "" { - w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "", "") + w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, newRoutableService.autoScaleAsleepMOTD, newRoutableService.autoScaleLoadingMOTD) } else { - w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "", "") + w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, newRoutableService.autoScaleAsleepMOTD, newRoutableService.autoScaleLoadingMOTD) } } } @@ -216,20 +220,22 @@ func (w *K8sWatcher) handleAdd(obj interface{}) { logrus.WithField("routableService", routableService).Debug("ADD") if routableService.externalServiceName != "" { - w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, "", "") + w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, routableService.autoScaleAsleepMOTD, routableService.autoScaleLoadingMOTD) } else { - w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, "", "") + w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, routableService.autoScaleAsleepMOTD, routableService.autoScaleLoadingMOTD) } } } } type routableService struct { - externalServiceName string - containerEndpoint string - scalingTarget string - autoScaleUp WakerFunc - autoScaleDown SleeperFunc + externalServiceName string + containerEndpoint string + scalingTarget string + autoScaleUp WakerFunc + autoScaleDown SleeperFunc + autoScaleAsleepMOTD string + autoScaleLoadingMOTD string } // obj is expected to be a *v1.Service @@ -288,17 +294,72 @@ func (w *K8sWatcher) buildDetails(service *core.Service, externalServiceName str // scalingTarget remains the service endpoint (already set above) } + autoScaleAsleepMOTD := "" + autoScaleLoadingMOTD := "" + waitTimeout := 60 * time.Second + + if asleepMOTD, exists := service.Annotations[AnnotationAutoScaleAsleepMOTD]; exists && asleepMOTD != "" { + autoScaleAsleepMOTD = asleepMOTD + } + + if loadingMOTD, exists := service.Annotations[AnnotationAutoScaleLoadingMOTD]; exists && loadingMOTD != "" { + autoScaleLoadingMOTD = loadingMOTD + } + + if timeoutStr, exists := service.Annotations[AnnotationAutoScaleWaitTimeout]; exists && timeoutStr != "" { + if parsedTimeout, err := time.ParseDuration(timeoutStr); err == nil && parsedTimeout > 0 { + waitTimeout = parsedTimeout + } else { + logrus.WithError(err).WithField("annotation", AnnotationAutoScaleWaitTimeout).WithField("value", timeoutStr).Warn("Invalid wait timeout annotation, falling back to default 60s") + } + } + wakerFunc := w.buildScaleFunction(service, 0, 1) rs := &routableService{ - externalServiceName: externalServiceName, - containerEndpoint: routingEndpoint, - scalingTarget: scalingTarget, - autoScaleUp: buildWakerFromSleeper(routingEndpoint, wakerFunc), - autoScaleDown: w.buildScaleFunction(service, 1, 0), + externalServiceName: externalServiceName, + containerEndpoint: routingEndpoint, + scalingTarget: scalingTarget, + autoScaleUp: buildK8sWaker(routingEndpoint, wakerFunc, waitTimeout), + autoScaleDown: w.buildScaleFunction(service, 1, 0), + autoScaleAsleepMOTD: autoScaleAsleepMOTD, + autoScaleLoadingMOTD: autoScaleLoadingMOTD, } return rs } +func buildK8sWaker(endpoint string, scaleUp SleeperFunc, waitTimeout time.Duration) WakerFunc { + if scaleUp == nil { + return nil + } + return func(ctx context.Context) (string, error) { + if err := scaleUp(ctx); err != nil { + return "", err + } + + deadline := time.Now().Add(waitTimeout) + for { + conn, err := net.DialTimeout("tcp", endpoint, 1*time.Second) + if err == nil { + _ = conn.Close() + break + } + if ctx.Err() != nil { + return endpoint, ctx.Err() + } + if time.Now().After(deadline) { + return endpoint, fmt.Errorf("timeout waiting for K8s backend to become reachable at %s", endpoint) + } + select { + case <-ctx.Done(): + return endpoint, ctx.Err() + case <-time.After(500 * time.Millisecond): + } + } + + return endpoint, nil + } +} + // buildScaleFunction generates a SleeperFunc to scale StatefulSets based on specified criteria and service annotations. // Will return nil if the service should not be auto-scaled due config or annotation. func (w *K8sWatcher) buildScaleFunction(service *core.Service, from int32, to int32) SleeperFunc { diff --git a/server/k8s_test.go b/server/k8s_test.go index 42a6b38..489bbab 100644 --- a/server/k8s_test.go +++ b/server/k8s_test.go @@ -3,6 +3,8 @@ package server import ( "context" "encoding/json" + "fmt" + "net" "testing" "time" @@ -534,3 +536,81 @@ func TestK8s_autoScaleWithoutProxy(t *testing.T) { // This ensures auto-scaling targets the correct StatefulSet routesHandler.AssertCalled(t, "CreateMapping", "atm-10.example.com", "10.0.0.10:25565", "10.0.0.10:25565", mock.Anything, mock.Anything, mock.Anything, mock.Anything) } + +func TestBuildK8sWaker_NilScaleUp(t *testing.T) { + waker := buildK8sWaker("10.0.0.1:25565", nil, 60*time.Second) + assert.Nil(t, waker, "buildK8sWaker should return nil when scaleUp is nil") +} + +func TestBuildK8sWaker_WaitsForEndpoint(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + + endpoint := ln.Addr().String() + + scaleUpCalled := false + scaleUp := func(ctx context.Context) error { + scaleUpCalled = true + return nil + } + + waker := buildK8sWaker(endpoint, scaleUp, 60*time.Second) + require.NotNil(t, waker) + + result, err := waker(context.Background()) + assert.NoError(t, err) + assert.Equal(t, endpoint, result) + assert.True(t, scaleUpCalled, "scaleUp should have been called") +} + +func TestBuildK8sWaker_ScaleUpError(t *testing.T) { + scaleUp := func(ctx context.Context) error { + return fmt.Errorf("scale up failed") + } + + waker := buildK8sWaker("10.0.0.1:25565", scaleUp, 60*time.Second) + require.NotNil(t, waker) + + _, err := waker(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "scale up failed") +} + +func TestBuildK8sWaker_ContextCancellation(t *testing.T) { + scaleUp := func(ctx context.Context) error { + return nil + } + + waker := buildK8sWaker("192.0.2.1:65534", scaleUp, 60*time.Second) + require.NotNil(t, waker) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _, err := waker(ctx) + assert.Error(t, err, "waker should return error when context is cancelled") +} + +func TestK8s_motdAnnotations(t *testing.T) { + DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) + + routesHandler := new(MockedRoutesHandler) + routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("GetAsleepMOTD", mock.Anything).Return("") + routesHandler.On("DeleteMapping", mock.Anything).Return(true) + + watcher := &K8sWatcher{ + autoScaleUp: true, + routesHandler: routesHandler, + } + + svc := v1.Service{} + err := json.Unmarshal([]byte(`{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "mc.example.com", "mc-router.itzg.me/autoScaleUp": "true", "mc-router.itzg.me/autoScaleAsleepMOTD": "Server is sleeping", "mc-router.itzg.me/autoScaleLoadingMOTD": "Server is starting"}}, "spec":{"clusterIP": "10.0.0.5"}}`), &svc) + require.NoError(t, err) + + watcher.handleAdd(&svc) + + routesHandler.AssertCalled(t, "CreateMapping", "mc.example.com", "10.0.0.5:25565", "10.0.0.5:25565", mock.Anything, mock.Anything, "Server is sleeping", "Server is starting") +}