feat: show asleep or loading motd for k8s servers (#535)
This commit is contained in:
@@ -453,6 +453,26 @@ metadata:
|
||||
"mc-router.itzg.me/autoScaleDown": "false"
|
||||
```
|
||||
|
||||
To override the MOTD shown when the server is scaled down or scaling up, you can use the following annotations on the `Service object`:
|
||||
- `mc-router.itzg.me/autoScaleAsleepMOTD`
|
||||
- `mc-router.itzg.me/autoScaleLoadingMOTD`
|
||||
|
||||
You can also customize how long the router will wait for a scaling backend to become reachable (default: 60s):
|
||||
- `mc-router.itzg.me/autoScaleWaitTimeout` (e.g. `2m`, `30s`)
|
||||
|
||||
Example server with custom MOTD and timeout:
|
||||
```yaml
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: mc-forge
|
||||
annotations:
|
||||
"mc-router.itzg.me/externalServerName": "external.host.name"
|
||||
"mc-router.itzg.me/autoScaleAsleepMOTD": "Server is sleeping"
|
||||
"mc-router.itzg.me/autoScaleLoadingMOTD": "Server is loading"
|
||||
"mc-router.itzg.me/autoScaleWaitTimeout": "90s"
|
||||
```
|
||||
|
||||
#### Using with Velocity/BungeeCord proxies
|
||||
|
||||
When using a proxy server like Velocity or BungeeCord, you can use the `mc-router.itzg.me/proxyServerName` annotation to route client connections to the proxy while still allowing mc-router to auto-scale the backend StatefulSet. This is useful when you want to:
|
||||
|
||||
@@ -659,6 +659,21 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
|
||||
}
|
||||
}
|
||||
|
||||
if waker != nil && nextState == mcproto.StateStatus {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"client": clientAddr,
|
||||
"server": serverAddress,
|
||||
"isLegacy": isLegacy,
|
||||
}).Debug("Scalable backend unreachable: serving predefined status response")
|
||||
|
||||
br := bufio.NewReader(frontendConn)
|
||||
if isLegacy {
|
||||
c.serveLegacyStatus(frontendConn, resolvedHost)
|
||||
} else {
|
||||
c.serveStatus(frontendConn, br, resolvedHost, clientProtocol)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/itzg/mc-router/mcproto"
|
||||
"github.com/pires/go-proxyproto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTrustedProxyNetworkPolicy(t *testing.T) {
|
||||
@@ -110,3 +117,80 @@ func TestConnectorGetLoadingMOTD(t *testing.T) {
|
||||
Routes.SetDefaultRoute("default:25565", "", nil, nil, "", "default loading")
|
||||
assert.Equal(t, "default loading", c.getLoadingMOTD(""))
|
||||
}
|
||||
|
||||
func writeTestPacket(w io.Writer, packetID int32, payload func(w io.Writer)) error {
|
||||
var b bytes.Buffer
|
||||
_ = mcproto.WriteVarInt(&b, packetID)
|
||||
if payload != nil {
|
||||
payload(&b)
|
||||
}
|
||||
|
||||
var framed bytes.Buffer
|
||||
_ = mcproto.WriteVarInt(&framed, int32(b.Len()))
|
||||
framed.Write(b.Bytes())
|
||||
_, err := w.Write(framed.Bytes())
|
||||
return err
|
||||
}
|
||||
|
||||
func TestConnectorMOTDFallback(t *testing.T) {
|
||||
oldRoutes := Routes
|
||||
defer func() {
|
||||
Routes = oldRoutes
|
||||
}()
|
||||
|
||||
Routes = NewRoutes()
|
||||
|
||||
backendAddress := "127.0.0.1:0"
|
||||
|
||||
scaleUpCalled := false
|
||||
waker := func(ctx context.Context) (string, error) {
|
||||
scaleUpCalled = true
|
||||
return backendAddress, nil
|
||||
}
|
||||
|
||||
Routes.CreateMapping("mc.example.com", backendAddress, "", waker, nil, "fallback asleep", "fallback loading")
|
||||
|
||||
metricsBuilder := discardMetricsBuilder{}
|
||||
c := NewConnector(context.Background(), metricsBuilder.BuildConnectorMetrics(), false, false, nil)
|
||||
c.UseAsleepMOTD("global asleep")
|
||||
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
defer ln.Close()
|
||||
|
||||
go c.acceptConnections(ln, 100, 0)
|
||||
|
||||
clientConn, err := net.Dial("tcp", ln.Addr().String())
|
||||
require.NoError(t, err)
|
||||
defer clientConn.Close()
|
||||
|
||||
err = writeTestPacket(clientConn, 0x00, func(w io.Writer) {
|
||||
_ = mcproto.WriteVarInt(w, 758)
|
||||
_ = mcproto.WriteString(w, "mc.example.com")
|
||||
w.Write([]byte{0x63, 0xdd})
|
||||
_ = mcproto.WriteVarInt(w, 1)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// 2. Send Status Request
|
||||
err = writeTestPacket(clientConn, 0x00, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// 3. Read Status Response
|
||||
_ = clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
|
||||
reader := bufio.NewReader(clientConn)
|
||||
|
||||
frame, err := mcproto.ReadFrame(reader, clientConn.RemoteAddr())
|
||||
require.NoError(t, err)
|
||||
|
||||
packetReader := bytes.NewReader(frame.Payload)
|
||||
packetID, err := mcproto.ReadVarInt(packetReader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0x00, packetID)
|
||||
|
||||
jsonStr, err := mcproto.ReadString(packetReader)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Contains(t, jsonStr, "fallback asleep")
|
||||
assert.False(t, scaleUpCalled, "Waker should NOT be called for a status request")
|
||||
}
|
||||
|
||||
+66
-5
@@ -7,6 +7,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -27,6 +28,9 @@ const (
|
||||
AnnotationAutoScaleUp = "mc-router.itzg.me/autoScaleUp"
|
||||
AnnotationAutoScaleDown = "mc-router.itzg.me/autoScaleDown"
|
||||
AnnotationProxyServerName = "mc-router.itzg.me/proxyServerName"
|
||||
AnnotationAutoScaleAsleepMOTD = "mc-router.itzg.me/autoScaleAsleepMOTD"
|
||||
AnnotationAutoScaleLoadingMOTD = "mc-router.itzg.me/autoScaleLoadingMOTD"
|
||||
AnnotationAutoScaleWaitTimeout = "mc-router.itzg.me/autoScaleWaitTimeout"
|
||||
)
|
||||
|
||||
// K8sWatcher is a RouteFinder that can find routes from kubernetes services.
|
||||
@@ -185,9 +189,9 @@ func (w *K8sWatcher) handleUpdate(oldObj interface{}, newObj interface{}) {
|
||||
"new": newRoutableService,
|
||||
}).Debug("UPDATE")
|
||||
if newRoutableService.externalServiceName != "" {
|
||||
w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "", "")
|
||||
w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, newRoutableService.autoScaleAsleepMOTD, newRoutableService.autoScaleLoadingMOTD)
|
||||
} else {
|
||||
w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "", "")
|
||||
w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, newRoutableService.autoScaleAsleepMOTD, newRoutableService.autoScaleLoadingMOTD)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,9 +220,9 @@ func (w *K8sWatcher) handleAdd(obj interface{}) {
|
||||
logrus.WithField("routableService", routableService).Debug("ADD")
|
||||
|
||||
if routableService.externalServiceName != "" {
|
||||
w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, "", "")
|
||||
w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, routableService.autoScaleAsleepMOTD, routableService.autoScaleLoadingMOTD)
|
||||
} else {
|
||||
w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, "", "")
|
||||
w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, routableService.autoScaleAsleepMOTD, routableService.autoScaleLoadingMOTD)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -230,6 +234,8 @@ type routableService struct {
|
||||
scalingTarget string
|
||||
autoScaleUp WakerFunc
|
||||
autoScaleDown SleeperFunc
|
||||
autoScaleAsleepMOTD string
|
||||
autoScaleLoadingMOTD string
|
||||
}
|
||||
|
||||
// obj is expected to be a *v1.Service
|
||||
@@ -288,17 +294,72 @@ func (w *K8sWatcher) buildDetails(service *core.Service, externalServiceName str
|
||||
// scalingTarget remains the service endpoint (already set above)
|
||||
}
|
||||
|
||||
autoScaleAsleepMOTD := ""
|
||||
autoScaleLoadingMOTD := ""
|
||||
waitTimeout := 60 * time.Second
|
||||
|
||||
if asleepMOTD, exists := service.Annotations[AnnotationAutoScaleAsleepMOTD]; exists && asleepMOTD != "" {
|
||||
autoScaleAsleepMOTD = asleepMOTD
|
||||
}
|
||||
|
||||
if loadingMOTD, exists := service.Annotations[AnnotationAutoScaleLoadingMOTD]; exists && loadingMOTD != "" {
|
||||
autoScaleLoadingMOTD = loadingMOTD
|
||||
}
|
||||
|
||||
if timeoutStr, exists := service.Annotations[AnnotationAutoScaleWaitTimeout]; exists && timeoutStr != "" {
|
||||
if parsedTimeout, err := time.ParseDuration(timeoutStr); err == nil && parsedTimeout > 0 {
|
||||
waitTimeout = parsedTimeout
|
||||
} else {
|
||||
logrus.WithError(err).WithField("annotation", AnnotationAutoScaleWaitTimeout).WithField("value", timeoutStr).Warn("Invalid wait timeout annotation, falling back to default 60s")
|
||||
}
|
||||
}
|
||||
|
||||
wakerFunc := w.buildScaleFunction(service, 0, 1)
|
||||
rs := &routableService{
|
||||
externalServiceName: externalServiceName,
|
||||
containerEndpoint: routingEndpoint,
|
||||
scalingTarget: scalingTarget,
|
||||
autoScaleUp: buildWakerFromSleeper(routingEndpoint, wakerFunc),
|
||||
autoScaleUp: buildK8sWaker(routingEndpoint, wakerFunc, waitTimeout),
|
||||
autoScaleDown: w.buildScaleFunction(service, 1, 0),
|
||||
autoScaleAsleepMOTD: autoScaleAsleepMOTD,
|
||||
autoScaleLoadingMOTD: autoScaleLoadingMOTD,
|
||||
}
|
||||
return rs
|
||||
}
|
||||
|
||||
func buildK8sWaker(endpoint string, scaleUp SleeperFunc, waitTimeout time.Duration) WakerFunc {
|
||||
if scaleUp == nil {
|
||||
return nil
|
||||
}
|
||||
return func(ctx context.Context) (string, error) {
|
||||
if err := scaleUp(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(waitTimeout)
|
||||
for {
|
||||
conn, err := net.DialTimeout("tcp", endpoint, 1*time.Second)
|
||||
if err == nil {
|
||||
_ = conn.Close()
|
||||
break
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return endpoint, ctx.Err()
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return endpoint, fmt.Errorf("timeout waiting for K8s backend to become reachable at %s", endpoint)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return endpoint, ctx.Err()
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
return endpoint, nil
|
||||
}
|
||||
}
|
||||
|
||||
// buildScaleFunction generates a SleeperFunc to scale StatefulSets based on specified criteria and service annotations.
|
||||
// Will return nil if the service should not be auto-scaled due config or annotation.
|
||||
func (w *K8sWatcher) buildScaleFunction(service *core.Service, from int32, to int32) SleeperFunc {
|
||||
|
||||
@@ -3,6 +3,8 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -534,3 +536,81 @@ func TestK8s_autoScaleWithoutProxy(t *testing.T) {
|
||||
// This ensures auto-scaling targets the correct StatefulSet
|
||||
routesHandler.AssertCalled(t, "CreateMapping", "atm-10.example.com", "10.0.0.10:25565", "10.0.0.10:25565", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
|
||||
}
|
||||
|
||||
func TestBuildK8sWaker_NilScaleUp(t *testing.T) {
|
||||
waker := buildK8sWaker("10.0.0.1:25565", nil, 60*time.Second)
|
||||
assert.Nil(t, waker, "buildK8sWaker should return nil when scaleUp is nil")
|
||||
}
|
||||
|
||||
func TestBuildK8sWaker_WaitsForEndpoint(t *testing.T) {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
defer ln.Close()
|
||||
|
||||
endpoint := ln.Addr().String()
|
||||
|
||||
scaleUpCalled := false
|
||||
scaleUp := func(ctx context.Context) error {
|
||||
scaleUpCalled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
waker := buildK8sWaker(endpoint, scaleUp, 60*time.Second)
|
||||
require.NotNil(t, waker)
|
||||
|
||||
result, err := waker(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, endpoint, result)
|
||||
assert.True(t, scaleUpCalled, "scaleUp should have been called")
|
||||
}
|
||||
|
||||
func TestBuildK8sWaker_ScaleUpError(t *testing.T) {
|
||||
scaleUp := func(ctx context.Context) error {
|
||||
return fmt.Errorf("scale up failed")
|
||||
}
|
||||
|
||||
waker := buildK8sWaker("10.0.0.1:25565", scaleUp, 60*time.Second)
|
||||
require.NotNil(t, waker)
|
||||
|
||||
_, err := waker(context.Background())
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "scale up failed")
|
||||
}
|
||||
|
||||
func TestBuildK8sWaker_ContextCancellation(t *testing.T) {
|
||||
scaleUp := func(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
waker := buildK8sWaker("192.0.2.1:65534", scaleUp, 60*time.Second)
|
||||
require.NotNil(t, waker)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err := waker(ctx)
|
||||
assert.Error(t, err, "waker should return error when context is cancelled")
|
||||
}
|
||||
|
||||
func TestK8s_motdAnnotations(t *testing.T) {
|
||||
DownScaler = NewDownScaler(context.Background(), false, 1*time.Second)
|
||||
|
||||
routesHandler := new(MockedRoutesHandler)
|
||||
routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
routesHandler.On("GetAsleepMOTD", mock.Anything).Return("")
|
||||
routesHandler.On("DeleteMapping", mock.Anything).Return(true)
|
||||
|
||||
watcher := &K8sWatcher{
|
||||
autoScaleUp: true,
|
||||
routesHandler: routesHandler,
|
||||
}
|
||||
|
||||
svc := v1.Service{}
|
||||
err := json.Unmarshal([]byte(`{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "mc.example.com", "mc-router.itzg.me/autoScaleUp": "true", "mc-router.itzg.me/autoScaleAsleepMOTD": "Server is sleeping", "mc-router.itzg.me/autoScaleLoadingMOTD": "Server is starting"}}, "spec":{"clusterIP": "10.0.0.5"}}`), &svc)
|
||||
require.NoError(t, err)
|
||||
|
||||
watcher.handleAdd(&svc)
|
||||
|
||||
routesHandler.AssertCalled(t, "CreateMapping", "mc.example.com", "10.0.0.5:25565", "10.0.0.5:25565", mock.Anything, mock.Anything, "Server is sleeping", "Server is starting")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user