From 21f349c2da720e1ace198f8016d470c4a90d5e96 Mon Sep 17 00:00:00 2001 From: Chris Farhood Date: Fri, 13 Feb 2026 08:07:27 -0500 Subject: [PATCH] Allow mc-router to scale a backend StatefulSet while routing traffic to a proxy via proxyServerName (#512) --- .devcontainer/devcontainer.json | 37 ------ CLAUDE.md | 77 ++++++++++++ README.md | 90 +++++++++++++- docs/k8s-autoscale.yaml | 7 +- docs/k8s-deployment-cluster-role.yaml | 4 +- docs/k8s-deployment.yaml | 4 +- server/api_server.go | 4 +- server/connector.go | 21 ++-- server/docker.go | 14 +-- server/docker_swarm.go | 14 +-- server/k8s.go | 87 +++++++++++--- server/k8s_test.go | 161 ++++++++++++++++++++++++-- server/routes.go | 63 +++++----- server/routes_config_loader.go | 6 +- server/routes_test.go | 71 +++++++++++- server/server.go | 2 +- 16 files changed, 527 insertions(+), 135 deletions(-) delete mode 100644 .devcontainer/devcontainer.json create mode 100644 CLAUDE.md diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json deleted file mode 100644 index f1b25df..0000000 --- a/.devcontainer/devcontainer.json +++ /dev/null @@ -1,37 +0,0 @@ -// For format details, see https://aka.ms/devcontainer.json. For config options, see the -// README at: https://github.com/devcontainers/templates/tree/main/src/go -{ - "name": "Go", - // Use bookworm due to - // (!) The 'moby' option is not supported on Debian 'trixie' because 'moby-cli' and related system packages have been removed from that distribution. - // (!) To continue, either set the feature option '"moby": false' or use a different base image (for example: 'debian:bookworm' or 'ubuntu-24.04'). - "image": "golang:1.25-bookworm", - // Features to add to the dev container. More info: https://containers.dev/features. - "features": { - // For in-docker discovery testing - "ghcr.io/devcontainers/features/docker-outside-of-docker:1": {} - }, - // Use 'forwardPorts' to make a list of ports inside the container available locally. - "forwardPorts": [ - 25565 - ], - - containerEnv: { - "GOROOT": "/usr/local/go" - }, - - // Configure tool-specific properties. - "customizations": { - "jetbrains": { - "backend": "IntelliJ", - "plugins": [ - "org.jetbrains.plugins.go" - ] - }, - "vscode": { - "extensions": [ - "golang.go" - ] - } - } -} diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..e957db9 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,77 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +mc-router is a Minecraft Java Edition reverse proxy that routes client connections to backend servers based on the requested server address (hostname). It multiplexes multiple Minecraft servers onto a single public IP/port and supports auto-discovery via Kubernetes and Docker, auto-scaling (scale to zero/one), webhooks, rate limiting, IP filtering, PROXY protocol, and ngrok tunneling. + +## Build & Test Commands + +```bash +go build ./cmd/mc-router/ # Build the binary +make test # Run all tests (go test ./...) +go test ./server/... # Run only server package tests +go test ./mcproto/... # Run only protocol package tests +go test -run TestRouteLookup ./server/ # Run a single test +docker build -t mc-router . # Build Docker image +``` + +Go version: 1.25. Testing uses `testify` (assert/require). Tests are table-driven. + +## Architecture + +### Request Flow + +1. **Connector** (`server/connector.go`) accepts TCP connections on port 25565 +2. **mcproto** package (`mcproto/`) reads the Minecraft handshake packet to extract the target server address +3. **Routes** (`server/routes.go`) looks up the backend address for that hostname +4. If auto-scale is enabled and the backend is sleeping, a **waker** function starts it (Kubernetes StatefulSet replica 0→1 or Docker container start/unpause) +5. Traffic is proxied bidirectionally between client and backend +6. On disconnect, metrics are updated, webhooks fired, and the **DownScaler** (`server/down_scaler.go`) may schedule scale-down after idle timeout + +### Key Packages + +- **`cmd/mc-router/`** — Entry point. Parses CLI flags via `go-flagsfiller`, sets up signal handling (SIGINT for shutdown, SIGHUP for config reload). +- **`server/`** — Core router logic: + - `server.go` — Initializes all subsystems (metrics, routes, connector, API, service discovery) + - `connector.go` — Connection handler: accepts clients, reads handshake, proxies traffic, manages rate limiting and client filtering + - `routes.go` — In-memory route table mapping server addresses to backends; supports default route fallback + - `routes_config_loader.go` — Loads/watches JSON routes config file (with fsnotify) + - `k8s.go` — Kubernetes service discovery via annotation `mc-router.itzg.me/externalServerName` + - `docker.go` / `docker_swarm.go` — Docker/Swarm container discovery via label `mc-router.host` + - `down_scaler.go` — Auto-scale down after idle period + - `api_server.go` — REST API (`GET/POST /routes`, `POST /defaultRoute`, `DELETE /routes/{serverAddress}`) + - `metrics.go` — Pluggable metrics backends (Prometheus, InfluxDB, expvar, discard) + - `webhook_notifier.go` — POST notifications on connect/disconnect events + - `client_filter.go` / `allow_deny_list.go` — IP and player allow/deny lists + - `configs.go` — All configuration structs with CLI flag annotations +- **`mcproto/`** — Minecraft Java protocol implementation: + - `types.go` — Frame, Packet, Handshake, LoginStart types + - `read.go` / `write.go` — Network I/O for Minecraft protocol frames + - `decode.go` — Packet decoding (handshake, login, status) + +### Configuration + +CLI flags are the primary config mechanism, with environment variable support via `go-flagsfiller` (flag `--some-flag` maps to env `SOME_FLAG`). Routes can also be loaded from a JSON config file (`--routes-config`). The `Config` struct in `server/configs.go` defines all options. + +### Service Discovery + +Routes are populated from three sources that can be combined: +1. Static `--mapping` flags or JSON config file +2. Kubernetes: watches Services with `mc-router.itzg.me/externalServerName` annotation +3. Docker/Swarm: watches containers with `mc-router.host` label + +### Key Dependencies + +- `sirupsen/logrus` — Logging +- `k8s.io/client-go` — Kubernetes client +- `github.com/docker/docker` — Docker client +- `github.com/gorilla/mux` — HTTP routing (API server) +- `github.com/prometheus/client_golang` — Prometheus metrics +- `golang.ngrok.com/ngrok` — ngrok tunnel integration +- `github.com/stretchr/testify` — Test assertions + +### Protocol Notes + +The `mcproto` package handles Minecraft Java protocol quirks: Forge mod identifiers appended to server addresses (separated by `\x00`), DNS root zone trailing dots, legacy server list ping format, and VarInt encoding. Server address matching in routes strips these suffixes before lookup. diff --git a/README.md b/README.md index f41ee67..618a24d 100644 --- a/README.md +++ b/README.md @@ -272,6 +272,7 @@ For more information on the allow/deny list configuration, see the [json schema] When running `mc-router` as a Kubernetes Pod and you pass the `--in-kube-cluster` command-line argument, then it will automatically watch for any services annotated with - `mc-router.itzg.me/externalServerName` : The value of the annotation will be registered as the external hostname Minecraft clients would used to connect to the routed service. The service is used as the routed backend. You can use more hostnames by splitting them with comma or newline. Whitespace around commas is automatically trimmed. For example: `"host1.com,host2.com"`, `"host1.com, host2.com"`, or multi-line values. - `mc-router.itzg.me/defaultServer` : When set to "true", the service is used as the default if no other `externalServiceName` annotations applies. +- `mc-router.itzg.me/proxyServerName` : When using a proxy server like Velocity or BungeeCord, this annotation specifies the proxy's address to route traffic to. The Service endpoint is still used for auto-scaling operations, allowing mc-router to scale the backend StatefulSet while routing client connections to the proxy. See [Using with Velocity/BungeeCord proxies](#using-with-velocitybungeecord-proxies) for details. By default, the router will watch all namespaces for those services; however, a specific namespace can be specified using the `KUBE_NAMESPACE` environment variable. The pod's own namespace could be set using: @@ -338,12 +339,14 @@ and if using StatefulSet auto-scaling additionally ```yaml - apiGroups: ["apps"] resources: ["statefulsets"] - verbs: ["watch","list","get","update"] + verbs: ["watch","list","patch"] - apiGroups: ["apps"] resources: ["statefulsets/scale"] - verbs: ["get","update"] + verbs: ["get"] ``` +**Note:** The `patch` verb is preferred for scaling operations as it provides atomic updates and prevents concurrency conflicts. For backward compatibility, mc-router will automatically fall back to using `get` + `update` if `patch` is not permitted, but this may result in occasional scaling conflicts in high-traffic scenarios. + ### Service parsing To detrmine the endpoint mc-router will pick the host from `spec.clusterIP` by default, if the service is of type `ExtenalName` it will use `spec.externalName` instead. @@ -376,8 +379,8 @@ The `-auto-scale-up` flag argument makes the router "wake up" any stopped backen Both options require using `kind: StatefulSet` instead of `kind: Service` for the Minecraft backend servers. -They also require the `ClusterRole` to permit `get` + `update` for `statefulsets` & `statefulsets/scale`, -e.g. like this (or some equivalent more fine-grained one to only watch/list services+statefulsets, and only get+update scale): +They also require the `ClusterRole` to permit `patch` for `statefulsets`, +e.g. like this (or some equivalent more fine-grained one to only watch/list services+statefulsets, and patch statefulsets): ```yaml apiVersion: rbac.authorization.k8s.io/v1 @@ -389,10 +392,15 @@ rules: resources: ["services"] verbs: ["watch","list"] - apiGroups: ["apps"] - resources: ["statefulsets", "statefulsets/scale"] - verbs: ["watch","list","get","update"] + resources: ["statefulsets"] + verbs: ["watch","list","patch"] +- apiGroups: ["apps"] + resources: ["statefulsets/scale"] + verbs: ["get"] ``` +**Note:** The `patch` verb is preferred for scaling operations as it provides atomic updates and prevents concurrency conflicts. For backward compatibility, mc-router will automatically fall back to using `get` + `update` if `patch` is not permitted, but this may result in occasional scaling conflicts in high-traffic scenarios. + Make sure to set `StatefulSet.metadata.name` and `StatefulSet.spec.serviceName` to the same value; otherwise, autoscaling will not trigger: @@ -441,6 +449,76 @@ metadata: "mc-router.itzg.me/autoScaleDown": "false" ``` +#### Using with Velocity/BungeeCord proxies + +When using a proxy server like Velocity or BungeeCord, you can use the `mc-router.itzg.me/proxyServerName` annotation to route client connections to the proxy while still allowing mc-router to auto-scale the backend StatefulSet. This is useful when you want to: + +1. Route all client traffic through a proxy server (for cross-server features, permissions, etc.) +2. Maintain auto-scaling capabilities for individual backend servers +3. Separate routing (to proxy) from scaling (backend StatefulSet) + +Example configuration: + +```yaml +# Velocity/BungeeCord proxy service (always running) +apiVersion: v1 +kind: Service +metadata: + name: velocity-proxy +spec: + selector: + app: velocity + ports: + - name: minecraft + port: 25577 +--- +# Backend Minecraft server with auto-scaling +apiVersion: v1 +kind: Service +metadata: + name: mc-survival + annotations: + # External hostname that clients connect to + "mc-router.itzg.me/externalServerName": "survival.example.com" + # Route traffic to the proxy instead of directly to this service + "mc-router.itzg.me/proxyServerName": "velocity-proxy:25577" +spec: + selector: + app: mc-survival + ports: + - name: minecraft + port: 25565 +--- +# Backend StatefulSet that can be scaled to zero +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: mc-survival +spec: + serviceName: mc-survival + replicas: 0 # Can be scaled from 0 to 1 automatically + selector: + matchLabels: + app: mc-survival + template: + metadata: + labels: + app: mc-survival + spec: + containers: + - name: mc + image: itzg/minecraft-server + # ... container configuration +``` + +In this configuration: +- Clients connecting to `survival.example.com` are routed to `velocity-proxy:25577` +- When a client connects, mc-router scales the `mc-survival` StatefulSet from 0 to 1 replicas +- The proxy handles the actual game connections to the backend server +- When idle, mc-router scales the StatefulSet back to 0 replicas + +**Note:** The proxy server must be configured to connect to the backend server at `mc-survival:25565` (the Service endpoint) and handle the case where the backend may not be available immediately during scale-up. + ### Troubleshooting First and foremost, enable debug logs on mc-router by setting the `DEBUG` environment variable to "true". With that, the logs will be fairly verbose with information about incoming connections, handshake processing, backend service discovery, and backend connection establishment and teardown. diff --git a/docs/k8s-autoscale.yaml b/docs/k8s-autoscale.yaml index e2f3cdc..7f3dac9 100644 --- a/docs/k8s-autoscale.yaml +++ b/docs/k8s-autoscale.yaml @@ -23,8 +23,11 @@ rules: resources: ["services"] verbs: ["watch","list"] - apiGroups: ["apps"] - resources: ["statefulsets", "statefulsets/scale"] - verbs: ["watch","list","get","update"] + resources: ["statefulsets"] + verbs: ["watch","list","patch"] +- apiGroups: ["apps"] + resources: ["statefulsets/scale"] + verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/docs/k8s-deployment-cluster-role.yaml b/docs/k8s-deployment-cluster-role.yaml index 9c7d6b0..638be7b 100644 --- a/docs/k8s-deployment-cluster-role.yaml +++ b/docs/k8s-deployment-cluster-role.yaml @@ -15,10 +15,10 @@ rules: verbs: ["watch","list"] - apiGroups: ["apps"] resources: ["statefulsets"] - verbs: ["watch","list","get","update"] + verbs: ["watch","list","patch"] - apiGroups: ["apps"] resources: ["statefulsets/scale"] - verbs: ["get","update"] + verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/docs/k8s-deployment.yaml b/docs/k8s-deployment.yaml index 26873f3..18292de 100644 --- a/docs/k8s-deployment.yaml +++ b/docs/k8s-deployment.yaml @@ -15,10 +15,10 @@ rules: verbs: ["watch","list"] - apiGroups: ["apps"] resources: ["statefulsets"] - verbs: ["watch","list","get","update"] + verbs: ["watch","list","patch"] - apiGroups: ["apps"] resources: ["statefulsets/scale"] - verbs: ["get","update"] + verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding diff --git a/server/api_server.go b/server/api_server.go index 9769d1d..a57cb90 100644 --- a/server/api_server.go +++ b/server/api_server.go @@ -81,7 +81,7 @@ func routesCreateHandler(writer http.ResponseWriter, request *http.Request) { return } - Routes.CreateMapping(definition.ServerAddress, definition.Backend, nil, nil, "") + Routes.CreateMapping(definition.ServerAddress, definition.Backend, "", nil, nil, "") RoutesConfigLoader.SaveRoutes() writer.WriteHeader(http.StatusCreated) } @@ -102,7 +102,7 @@ func routesSetDefault(writer http.ResponseWriter, request *http.Request) { return } - Routes.SetDefaultRoute(body.Backend, nil, nil, "") + Routes.SetDefaultRoute(body.Backend, "", nil, nil, "") RoutesConfigLoader.SaveRoutes() writer.WriteHeader(http.StatusOK) } diff --git a/server/connector.go b/server/connector.go index 33f0bea..750cb51 100644 --- a/server/connector.go +++ b/server/connector.go @@ -77,6 +77,7 @@ func NewConnector(ctx context.Context, metrics *ConnectorMetrics, sendProxyProto recordLogins: recordLogins, autoScaleUpAllowDenyConfig: autoScaleUpAllowDenyConfig, activeConnections: NewActiveConnections(), + scaleActiveConnections: NewActiveConnections(), } } @@ -95,6 +96,7 @@ type Connector struct { trustedProxyNets []*net.IPNet totalActiveConnections int32 activeConnections *ActiveConnections + scaleActiveConnections *ActiveConnections connectionsCond *sync.Cond ngrok NgrokConnector clientFilter *ClientFilter @@ -484,7 +486,7 @@ func (c *Connector) readPlayerInfo(protocolVersion mcproto.ProtocolVersion, buff } } -func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress string, playerInfo *PlayerInfo, backendHostPort string, cleanupMetrics bool, checkScaleDown bool) { +func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress string, playerInfo *PlayerInfo, backendHostPort string, scalingTarget string, cleanupMetrics bool, checkScaleDown bool) { if c.connectionNotifier != nil { err := c.connectionNotifier.NotifyDisconnected(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort) if err != nil { @@ -501,6 +503,8 @@ func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress With("server_address", serverAddress). Set(float64(c.activeConnections.GetCount(backendHostPort))) + c.scaleActiveConnections.Decrement(scalingTarget) + if c.recordLogins && playerInfo != nil { c.metrics.ServerActivePlayer. With("player_name", playerInfo.Name). @@ -514,8 +518,8 @@ func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress WithField("backendHostPort", backendHostPort). WithField("connectionCount", c.activeConnections.GetCount(backendHostPort)). Info("Closed connection to backend") - if checkScaleDown && c.activeConnections.GetCount(backendHostPort) <= 0 { - DownScaler.Begin(backendHostPort) + if checkScaleDown && c.scaleActiveConnections.GetCount(scalingTarget) <= 0 { + DownScaler.Begin(scalingTarget) } c.connectionsCond.Signal() } @@ -523,12 +527,12 @@ func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress func (c *Connector) findAndConnectBackend(frontendConn net.Conn, clientAddr net.Addr, preReadContent io.Reader, serverAddress string, playerInfo *PlayerInfo, nextState mcproto.State, isLegacy bool, clientProtocol int) { - backendHostPort, resolvedHost, waker, _ := Routes.FindBackendForServerAddress(c.ctx, serverAddress) + backendHostPort, resolvedHost, scalingTarget, waker, _ := Routes.FindBackendForServerAddress(c.ctx, serverAddress) cleanupMetrics := false cleanupCheckScaleDown := false defer func() { - c.cleanupBackendConnection(clientAddr, serverAddress, playerInfo, backendHostPort, cleanupMetrics, cleanupCheckScaleDown) + c.cleanupBackendConnection(clientAddr, serverAddress, playerInfo, backendHostPort, scalingTarget, cleanupMetrics, cleanupCheckScaleDown) }() if waker != nil && nextState > mcproto.StateStatus { @@ -541,8 +545,8 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, Debug("checked if player is allowed to wake up the server") if serverAllowsPlayer { // Cancel down scaler if active before scale up - if backendHostPort != "" { - DownScaler.Cancel(backendHostPort) + if scalingTarget != "" { + DownScaler.Cancel(scalingTarget) } cleanupCheckScaleDown = true logrus.WithField("serverAddress", serverAddress).Info("Waking up backend server") @@ -558,7 +562,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, return } // Cancel again in case any routes were changed during wake up - DownScaler.Cancel(newBackendHostPort) + DownScaler.Cancel(scalingTarget) backendHostPort = newBackendHostPort logrus.WithFields(logrus.Fields{ "serverAddress": serverAddress, @@ -642,6 +646,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, atomic.AddInt32(&c.totalActiveConnections, 1))) c.activeConnections.Increment(backendHostPort) + c.scaleActiveConnections.Increment(scalingTarget) c.metrics.ServerActiveConnections. With("server_address", serverAddress). Set(float64(c.activeConnections.GetCount(backendHostPort))) diff --git a/server/docker.go b/server/docker.go index 6d0a05a..8c144c0 100644 --- a/server/docker.go +++ b/server/docker.go @@ -186,9 +186,9 @@ func (w *dockerWatcherImpl) monitorContainers(ctx context.Context) error { wakerFunc := w.makeWakerFunc(rs) sleeperFunc := w.makeSleeperFunc(rs) if rs.externalContainerName != "" { - Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD) + Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD) } else { - Routes.SetDefaultRoute(rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD) + Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD) } } else if oldRs.containerEndpoint != rs.containerEndpoint || oldRs.containerID != rs.containerID || @@ -200,9 +200,9 @@ func (w *dockerWatcherImpl) monitorContainers(ctx context.Context) error { sleeperFunc := w.makeSleeperFunc(rs) if rs.externalContainerName != "" { Routes.DeleteMapping(rs.externalContainerName) - Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD) + Routes.CreateMapping(rs.externalContainerName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD) } else { - Routes.SetDefaultRoute(rs.containerEndpoint, wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD) + Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, rs.autoScaleAsleepMOTD) } logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE") } @@ -214,7 +214,7 @@ func (w *dockerWatcherImpl) monitorContainers(ctx context.Context) error { if rs.externalContainerName != "" { Routes.DeleteMapping(rs.externalContainerName) } else { - Routes.SetDefaultRoute("", nil, nil, "") + Routes.SetDefaultRoute("", "", nil, nil, "") } logrus.WithField("routableContainer", rs).Debug("DELETE") } @@ -256,9 +256,9 @@ func (w *dockerWatcherImpl) Start(ctx context.Context) error { wakerFunc := w.makeWakerFunc(c) sleeperFunc := w.makeSleeperFunc(c) if c.externalContainerName != "" { - Routes.CreateMapping(c.externalContainerName, c.containerEndpoint, wakerFunc, sleeperFunc, c.autoScaleAsleepMOTD) + Routes.CreateMapping(c.externalContainerName, c.containerEndpoint, "", wakerFunc, sleeperFunc, c.autoScaleAsleepMOTD) } else { - Routes.SetDefaultRoute(c.containerEndpoint, wakerFunc, sleeperFunc, c.autoScaleAsleepMOTD) + Routes.SetDefaultRoute(c.containerEndpoint, "", wakerFunc, sleeperFunc, c.autoScaleAsleepMOTD) } } diff --git a/server/docker_swarm.go b/server/docker_swarm.go index 64ab55a..bbe1046 100644 --- a/server/docker_swarm.go +++ b/server/docker_swarm.go @@ -92,9 +92,9 @@ func (w *dockerSwarmWatcherImpl) Start(ctx context.Context) error { wakerFunc := w.makeWakerFunc(s) sleeperFunc := w.makeSleeperFunc(s) if s.externalServiceName != "" { - Routes.CreateMapping(s.externalServiceName, s.containerEndpoint, wakerFunc, sleeperFunc, "") + Routes.CreateMapping(s.externalServiceName, s.containerEndpoint, "", wakerFunc, sleeperFunc, "") } else { - Routes.SetDefaultRoute(s.containerEndpoint, wakerFunc, sleeperFunc, "") + Routes.SetDefaultRoute(s.containerEndpoint, "", wakerFunc, sleeperFunc, "") } } @@ -116,9 +116,9 @@ func (w *dockerSwarmWatcherImpl) Start(ctx context.Context) error { wakerFunc := w.makeWakerFunc(rs) sleeperFunc := w.makeSleeperFunc(rs) if rs.externalServiceName != "" { - Routes.CreateMapping(rs.externalServiceName, rs.containerEndpoint, wakerFunc, sleeperFunc, "") + Routes.CreateMapping(rs.externalServiceName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, "") } else { - Routes.SetDefaultRoute(rs.containerEndpoint, wakerFunc, sleeperFunc, "") + Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, "") } } else if oldRs.containerEndpoint != rs.containerEndpoint { serviceMap[rs.externalServiceName] = rs @@ -126,9 +126,9 @@ func (w *dockerSwarmWatcherImpl) Start(ctx context.Context) error { sleeperFunc := w.makeSleeperFunc(rs) if rs.externalServiceName != "" { Routes.DeleteMapping(rs.externalServiceName) - Routes.CreateMapping(rs.externalServiceName, rs.containerEndpoint, wakerFunc, sleeperFunc, "") + Routes.CreateMapping(rs.externalServiceName, rs.containerEndpoint, "", wakerFunc, sleeperFunc, "") } else { - Routes.SetDefaultRoute(rs.containerEndpoint, wakerFunc, sleeperFunc, "") + Routes.SetDefaultRoute(rs.containerEndpoint, "", wakerFunc, sleeperFunc, "") } logrus.WithFields(logrus.Fields{"old": oldRs, "new": rs}).Debug("UPDATE") } @@ -140,7 +140,7 @@ func (w *dockerSwarmWatcherImpl) Start(ctx context.Context) error { if rs.externalServiceName != "" { Routes.DeleteMapping(rs.externalServiceName) } else { - Routes.SetDefaultRoute("", nil, nil, "") + Routes.SetDefaultRoute("", "", nil, nil, "") } logrus.WithField("routableService", rs).Debug("DELETE") } diff --git a/server/k8s.go b/server/k8s.go index 813c21f..4bbc2b4 100644 --- a/server/k8s.go +++ b/server/k8s.go @@ -11,10 +11,10 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" apps "k8s.io/api/apps/v1" - autoscaling "k8s.io/api/autoscaling/v1" core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -26,6 +26,7 @@ const ( AnnotationDefaultServer = "mc-router.itzg.me/defaultServer" AnnotationAutoScaleUp = "mc-router.itzg.me/autoScaleUp" AnnotationAutoScaleDown = "mc-router.itzg.me/autoScaleDown" + AnnotationProxyServerName = "mc-router.itzg.me/proxyServerName" ) // K8sWatcher is a RouteFinder that can find routes from kubernetes services. @@ -184,9 +185,9 @@ func (w *K8sWatcher) handleUpdate(oldObj interface{}, newObj interface{}) { "new": newRoutableService, }).Debug("UPDATE") if newRoutableService.externalServiceName != "" { - w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "") + w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "") } else { - w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "") + w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "") } } } @@ -201,7 +202,7 @@ func (w *K8sWatcher) handleDelete(obj interface{}) { if routableService.externalServiceName != "" { w.routesHandler.DeleteMapping(routableService.externalServiceName) } else { - w.routesHandler.SetDefaultRoute("", nil, nil, "") + w.routesHandler.SetDefaultRoute("", "", nil, nil, "") } } } @@ -215,9 +216,9 @@ func (w *K8sWatcher) handleAdd(obj interface{}) { logrus.WithField("routableService", routableService).Debug("ADD") if routableService.externalServiceName != "" { - w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.autoScaleUp, routableService.autoScaleDown, "") + w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, "") } else { - w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.autoScaleUp, routableService.autoScaleDown, "") + w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, "") } } } @@ -226,6 +227,7 @@ func (w *K8sWatcher) handleAdd(obj interface{}) { type routableService struct { externalServiceName string containerEndpoint string + scalingTarget string autoScaleUp WakerFunc autoScaleDown SleeperFunc } @@ -273,11 +275,25 @@ func (w *K8sWatcher) buildDetails(service *core.Service, externalServiceName str port = mcPort } endpoint := net.JoinHostPort(clusterIp, port) + + routingEndpoint := endpoint + scalingTarget := endpoint // Default to service endpoint for scaling + + if proxyServerName, exists := service.Annotations[AnnotationProxyServerName]; exists && proxyServerName != "" { + // Ensure the proxy address has a port + if _, _, err := net.SplitHostPort(proxyServerName); err != nil { + proxyServerName = net.JoinHostPort(proxyServerName, "25565") + } + routingEndpoint = proxyServerName + // scalingTarget remains the service endpoint (already set above) + } + wakerFunc := w.buildScaleFunction(service, 0, 1) rs := &routableService{ externalServiceName: externalServiceName, - containerEndpoint: endpoint, - autoScaleUp: buildWakerFromSleeper(endpoint, wakerFunc), + containerEndpoint: routingEndpoint, + scalingTarget: scalingTarget, + autoScaleUp: buildWakerFromSleeper(routingEndpoint, wakerFunc), autoScaleDown: w.buildScaleFunction(service, 1, 0), } return rs @@ -332,6 +348,7 @@ func (w *K8sWatcher) buildScaleFunction(service *core.Service, from int32, to in return func(ctx context.Context) error { serviceName := service.Name if statefulSetName, exists := w.mappings[serviceName]; exists { + // Get current replicas to check if scaling is needed if scale, err := w.clientset.AppsV1().StatefulSets(service.Namespace).GetScale(ctx, statefulSetName, meta.GetOptions{}); err == nil { replicas := scale.Status.Replicas logrus.WithFields(logrus.Fields{ @@ -339,25 +356,57 @@ func (w *K8sWatcher) buildScaleFunction(service *core.Service, from int32, to in "statefulSet": statefulSetName, "replicas": replicas, }).Debug("StatefulSet of Service Replicas") + if replicas == from { - if _, err := w.clientset.AppsV1().StatefulSets(service.Namespace).UpdateScale(ctx, statefulSetName, &autoscaling.Scale{ - ObjectMeta: meta.ObjectMeta{ - Name: scale.Name, - Namespace: scale.Namespace, - UID: scale.UID, - ResourceVersion: scale.ResourceVersion, - }, - Spec: autoscaling.ScaleSpec{Replicas: to}}, meta.UpdateOptions{}, - ); err == nil { + // Use Patch instead of Update to avoid optimistic concurrency errors + // This doesn't require resourceVersion and is atomic + patchData := fmt.Sprintf(`{"spec":{"replicas":%d}}`, to) + _, err := w.clientset.AppsV1().StatefulSets(service.Namespace).Patch( + ctx, + statefulSetName, + types.StrategicMergePatchType, + []byte(patchData), + meta.PatchOptions{}, + ) + if err == nil { logrus.WithFields(logrus.Fields{ "service": serviceName, "statefulSet": statefulSetName, "replicas": replicas, }).Infof("StatefulSet Replicas Autoscaled from %d to %d", from, to) - } else { - return errors.Wrapf(err, "UpdateScale for Replicas=%d failed for StatefulSet: %s", to, statefulSetName) + return nil } + + // Fallback to UpdateScale if Patch fails due to RBAC permissions + // This maintains backward compatibility with existing RBAC configurations + if strings.Contains(err.Error(), "forbidden") { + logrus.WithFields(logrus.Fields{ + "service": serviceName, + "statefulSet": statefulSetName, + }).Warn("Patch operation forbidden - falling back to UpdateScale. Consider updating RBAC to allow 'patch' verb for better concurrency handling") + + scale.Spec.Replicas = to + if _, updateErr := w.clientset.AppsV1().StatefulSets(service.Namespace).UpdateScale( + ctx, + statefulSetName, + scale, + meta.UpdateOptions{}, + ); updateErr == nil { + logrus.WithFields(logrus.Fields{ + "service": serviceName, + "statefulSet": statefulSetName, + "replicas": replicas, + }).Infof("StatefulSet Replicas Autoscaled from %d to %d (via UpdateScale fallback)", from, to) + return nil + } else { + return errors.Wrapf(updateErr, "UpdateScale fallback for Replicas=%d failed for StatefulSet: %s", to, statefulSetName) + } + } + + return errors.Wrapf(err, "Patch for Replicas=%d failed for StatefulSet: %s", to, statefulSetName) } + // Replicas already at desired state + return nil } else { return fmt.Errorf("GetScale failed for StatefulSet %s: %w", statefulSetName, err) } diff --git a/server/k8s_test.go b/server/k8s_test.go index 34f1a63..f9a56f0 100644 --- a/server/k8s_test.go +++ b/server/k8s_test.go @@ -28,16 +28,16 @@ func (m *MockedRoutesHandler) GetBackendForServer(server string) string { } } -func (m *MockedRoutesHandler) CreateMapping(serverAddress string, backend string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) { - m.MethodCalled("CreateMapping", serverAddress, backend, waker, sleeper, asleepMOTD) +func (m *MockedRoutesHandler) CreateMapping(serverAddress string, backend string, scaleKey string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) { + m.MethodCalled("CreateMapping", serverAddress, backend, scaleKey, waker, sleeper, asleepMOTD) if m.routes == nil { m.routes = make(map[string]string) } m.routes[serverAddress] = backend } -func (m *MockedRoutesHandler) SetDefaultRoute(backend string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) { - m.MethodCalled("SetDefaultRoute", backend, waker, sleeper, asleepMOTD) +func (m *MockedRoutesHandler) SetDefaultRoute(backend string, scaleKey string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) { + m.MethodCalled("SetDefaultRoute", backend, scaleKey, waker, sleeper, asleepMOTD) if m.routes == nil { m.routes = make(map[string]string) } @@ -183,8 +183,8 @@ func TestK8sWatcherImpl_handleAddThenUpdate(t *testing.T) { DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) routesHandler := new(MockedRoutesHandler) - routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() - routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() routesHandler.On("GetAsleepMOTD", mock.Anything).Return("") routesHandler.On("DeleteMapping", mock.Anything).Return(true) @@ -264,8 +264,8 @@ func TestK8sWatcherImpl_handleAddThenDelete(t *testing.T) { DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) routesHandler := new(MockedRoutesHandler) - routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() - routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() routesHandler.On("GetAsleepMOTD", mock.Anything).Return("") routesHandler.On("DeleteMapping", mock.Anything).Return(true) @@ -363,8 +363,8 @@ func TestK8s_externalName(t *testing.T) { DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) routesHandler := new(MockedRoutesHandler) - routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() - routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() routesHandler.On("GetAsleepMOTD", mock.Anything).Return("") routesHandler.On("DeleteMapping", mock.Anything).Return(true) @@ -393,3 +393,144 @@ func TestK8s_externalName(t *testing.T) { }) } } + +func TestK8s_proxyServerName(t *testing.T) { + type scenario struct { + server string + backend string + } + tests := []struct { + name string + svc string + scenarios []scenario + }{ + { + name: "proxy routes to proxy address", + svc: `{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "mc.example.com", "mc-router.itzg.me/proxyServerName": "velocity-proxy:25577"}}, "spec":{"clusterIP": "10.0.0.5"}}`, + scenarios: []scenario{ + {server: "mc.example.com", backend: "velocity-proxy:25577"}, + }, + }, + { + name: "proxy without port gets default 25565", + svc: `{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "mc.example.com", "mc-router.itzg.me/proxyServerName": "velocity-proxy"}}, "spec":{"clusterIP": "10.0.0.5"}}`, + scenarios: []scenario{ + {server: "mc.example.com", backend: "velocity-proxy:25565"}, + }, + }, + { + name: "no proxy annotation routes to ClusterIP", + svc: `{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "mc.example.com"}}, "spec":{"clusterIP": "10.0.0.5"}}`, + scenarios: []scenario{ + {server: "mc.example.com", backend: "10.0.0.5:25565"}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) + + routesHandler := new(MockedRoutesHandler) + routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("GetAsleepMOTD", mock.Anything).Return("") + routesHandler.On("DeleteMapping", mock.Anything).Return(true) + + watcher := &K8sWatcher{ + routesHandler: routesHandler, + } + svc := v1.Service{} + err := json.Unmarshal([]byte(test.svc), &svc) + require.NoError(t, err) + + watcher.handleAdd(&svc) + for _, s := range test.scenarios { + backend := routesHandler.GetBackendForServer(s.server) + assert.Equal(t, s.backend, backend, "given=%s", s.server) + } + }) + } +} + +func TestK8s_proxyServerNameScaleEndpoint(t *testing.T) { + DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) + + routesHandler := new(MockedRoutesHandler) + routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("GetAsleepMOTD", mock.Anything).Return("") + routesHandler.On("DeleteMapping", mock.Anything).Return(true) + + watcher := &K8sWatcher{ + routesHandler: routesHandler, + } + + svc := v1.Service{} + err := json.Unmarshal([]byte(`{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "mc.example.com", "mc-router.itzg.me/proxyServerName": "velocity:25577"}}, "spec":{"clusterIP": "10.0.0.5"}}`), &svc) + require.NoError(t, err) + + watcher.handleAdd(&svc) + + // Verify CreateMapping was called with the correct scaleKey (original endpoint) + routesHandler.AssertCalled(t, "CreateMapping", "mc.example.com", "velocity:25577", "10.0.0.5:25565", mock.Anything, mock.Anything, mock.Anything) +} + +func TestK8s_proxyServerNameUpdate(t *testing.T) { + DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) + + routesHandler := new(MockedRoutesHandler) + routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("GetAsleepMOTD", mock.Anything).Return("") + routesHandler.On("DeleteMapping", mock.Anything).Return(true) + + watcher := &K8sWatcher{ + routesHandler: routesHandler, + } + + // Start with proxy + initialSvc := v1.Service{} + err := json.Unmarshal([]byte(`{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "mc.example.com", "mc-router.itzg.me/proxyServerName": "velocity:25577"}}, "spec":{"clusterIP": "10.0.0.5"}}`), &initialSvc) + require.NoError(t, err) + + watcher.handleAdd(&initialSvc) + assert.Equal(t, "velocity:25577", routesHandler.GetBackendForServer("mc.example.com")) + + // Update to remove proxy + updatedSvc := v1.Service{} + err = json.Unmarshal([]byte(`{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "mc.example.com"}}, "spec":{"clusterIP": "10.0.0.5"}}`), &updatedSvc) + require.NoError(t, err) + + watcher.handleUpdate(&initialSvc, &updatedSvc) + assert.Equal(t, "10.0.0.5:25565", routesHandler.GetBackendForServer("mc.example.com")) +} + +func TestK8s_autoScaleWithoutProxy(t *testing.T) { + DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) + + routesHandler := new(MockedRoutesHandler) + routesHandler.On("CreateMapping", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("SetDefaultRoute", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return() + routesHandler.On("GetAsleepMOTD", mock.Anything).Return("") + routesHandler.On("DeleteMapping", mock.Anything).Return(true) + + watcher := &K8sWatcher{ + autoScaleUp: true, + autoScaleDown: true, + routesHandler: routesHandler, + } + + // Service WITHOUT proxyServerName but WITH autoScaleUp/Down annotations + svc := v1.Service{} + err := json.Unmarshal([]byte(`{"metadata": {"annotations": {"mc-router.itzg.me/externalServerName": "atm-10.example.com", "mc-router.itzg.me/autoScaleUp": "true", "mc-router.itzg.me/autoScaleDown": "true"}}, "spec":{"clusterIP": "10.0.0.10"}}`), &svc) + require.NoError(t, err) + + watcher.handleAdd(&svc) + + // Verify routes to ClusterIP (not proxy) + assert.Equal(t, "10.0.0.10:25565", routesHandler.GetBackendForServer("atm-10.example.com")) + + // CRITICAL: Verify scaleKey is set to the service endpoint (not empty) + // This ensures auto-scaling targets the correct StatefulSet + routesHandler.AssertCalled(t, "CreateMapping", "atm-10.example.com", "10.0.0.10:25565", "10.0.0.10:25565", mock.Anything, mock.Anything, mock.Anything) +} diff --git a/server/routes.go b/server/routes.go index 7169a5c..824bc43 100644 --- a/server/routes.go +++ b/server/routes.go @@ -36,8 +36,8 @@ type RouteFinder interface { } type RoutesHandler interface { - CreateMapping(serverAddress string, backend string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) - SetDefaultRoute(backend string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) + CreateMapping(serverAddress string, backend string, scalingTarget string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) + SetDefaultRoute(backend string, scalingTarget string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) // DeleteMapping requests that the serverAddress be removed from routes. // Returns true if the route existed. DeleteMapping(serverAddress string) bool @@ -50,13 +50,14 @@ type IRoutes interface { RegisterAll(mappings map[string]string) // FindBackendForServerAddress returns the host:port for the external server address, if registered. // Otherwise, an empty string is returned. Also returns the normalized version of the given serverAddress. - // The 3rd value returned is an (optional) "waker" function which a caller must invoke to wake up serverAddress. - // The 4th value returned is an (optional) "sleeper" function which a caller must invoke to shut down serverAddress. + // The 3rd value returned is the scalingTarget which indicates what endpoint to scale (may differ from backend when using proxy). + // The 4th value returned is an (optional) "waker" function which a caller must invoke to wake up serverAddress. + // The 5th value returned is an (optional) "sleeper" function which a caller must invoke to shut down serverAddress. HasRoute(serverAddress string) bool - FindBackendForServerAddress(ctx context.Context, serverAddress string) (string, string, WakerFunc, SleeperFunc) - GetSleepers(backend string) []SleeperFunc + FindBackendForServerAddress(ctx context.Context, serverAddress string) (string, string, string, WakerFunc, SleeperFunc) + GetSleepers(scalingTarget string) []SleeperFunc GetMappings() map[string]string - GetDefaultRoute() (string, WakerFunc, SleeperFunc) + GetDefaultRoute() (string, string, WakerFunc, SleeperFunc) GetAsleepMOTD(serverAddress string) string SimplifySRV(srvEnabled bool) } @@ -73,15 +74,16 @@ func NewRoutes() IRoutes { func (r *routesImpl) RegisterAll(mappings map[string]string) { for k, v := range mappings { - r.CreateMapping(k, v, nil, nil, "") + r.CreateMapping(k, v, "", nil, nil, "") } } type mapping struct { - backend string - waker WakerFunc - sleeper SleeperFunc - asleepMOTD string + backend string + waker WakerFunc + sleeper SleeperFunc + asleepMOTD string + scalingTarget string // The endpoint to scale (may differ from backend when using proxy) } type routesImpl struct { @@ -96,16 +98,19 @@ func (r *routesImpl) Reset() { DownScaler.Reset() } -func (r *routesImpl) SetDefaultRoute(backend string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) { - r.defaultRoute = mapping{backend: backend, waker: waker, sleeper: sleeper, asleepMOTD: asleepMOTD} +func (r *routesImpl) SetDefaultRoute(backend string, scalingTarget string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) { + if scalingTarget == "" { + scalingTarget = backend + } + r.defaultRoute = mapping{backend: backend, scalingTarget: scalingTarget, waker: waker, sleeper: sleeper, asleepMOTD: asleepMOTD} logrus.WithFields(logrus.Fields{ "backend": backend, }).Info("Using default route") } -func (r *routesImpl) GetDefaultRoute() (string, WakerFunc, SleeperFunc) { - return r.defaultRoute.backend, r.defaultRoute.waker, r.defaultRoute.sleeper +func (r *routesImpl) GetDefaultRoute() (string, string, WakerFunc, SleeperFunc) { + return r.defaultRoute.backend, r.defaultRoute.scalingTarget, r.defaultRoute.waker, r.defaultRoute.sleeper } func (r *routesImpl) GetAsleepMOTD(serverAddress string) string { @@ -134,7 +139,7 @@ func (r *routesImpl) HasRoute(serverAddress string) bool { return exists } -func (r *routesImpl) FindBackendForServerAddress(_ context.Context, serverAddress string) (string, string, WakerFunc, SleeperFunc) { +func (r *routesImpl) FindBackendForServerAddress(_ context.Context, serverAddress string) (string, string, string, WakerFunc, SleeperFunc) { r.RLock() defer r.RUnlock() @@ -173,23 +178,23 @@ func (r *routesImpl) FindBackendForServerAddress(_ context.Context, serverAddres if r.mappings != nil { if mapping, exists := r.mappings[serverAddress]; exists { - return mapping.backend, serverAddress, mapping.waker, mapping.sleeper + return mapping.backend, serverAddress, mapping.scalingTarget, mapping.waker, mapping.sleeper } } - return r.defaultRoute.backend, serverAddress, r.defaultRoute.waker, r.defaultRoute.sleeper + return r.defaultRoute.backend, serverAddress, r.defaultRoute.scalingTarget, r.defaultRoute.waker, r.defaultRoute.sleeper } -func (r *routesImpl) GetSleepers(backend string) []SleeperFunc { +func (r *routesImpl) GetSleepers(scalingTarget string) []SleeperFunc { r.RLock() defer r.RUnlock() var sleepers []SleeperFunc for _, m := range r.mappings { - if m.backend == backend && m.sleeper != nil { + if m.scalingTarget == scalingTarget && m.sleeper != nil { sleepers = append(sleepers, m.sleeper) } } - if r.defaultRoute.backend == backend && r.defaultRoute.sleeper != nil { + if r.defaultRoute.scalingTarget == scalingTarget && r.defaultRoute.sleeper != nil { sleepers = append(sleepers, r.defaultRoute.sleeper) } return sleepers @@ -212,7 +217,7 @@ func (r *routesImpl) DeleteMapping(serverAddress string) bool { logrus.WithField("serverAddress", serverAddress).Info("Deleting route") if m, ok := r.mappings[serverAddress]; ok { - DownScaler.Cancel(m.backend) + DownScaler.Cancel(m.scalingTarget) delete(r.mappings, serverAddress) return true } else { @@ -220,20 +225,24 @@ func (r *routesImpl) DeleteMapping(serverAddress string) bool { } } -func (r *routesImpl) CreateMapping(serverAddress string, backend string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) { +func (r *routesImpl) CreateMapping(serverAddress string, backend string, scalingTarget string, waker WakerFunc, sleeper SleeperFunc, asleepMOTD string) { r.Lock() defer r.Unlock() serverAddress = strings.ToLower(serverAddress) + if scalingTarget == "" { + scalingTarget = backend + } + logrus.WithFields(logrus.Fields{ "serverAddress": serverAddress, "backend": backend, }).Info("Created route mapping") - r.mappings[serverAddress] = mapping{backend: backend, waker: waker, sleeper: sleeper, asleepMOTD: asleepMOTD} + r.mappings[serverAddress] = mapping{backend: backend, scalingTarget: scalingTarget, waker: waker, sleeper: sleeper, asleepMOTD: asleepMOTD} // Trigger auto scale down when mapping is created to ensure servers are shut down if router restarts - if DownScaler != nil && backend != "" { - DownScaler.Begin(backend) + if DownScaler != nil && scalingTarget != "" { + DownScaler.Begin(scalingTarget) } } diff --git a/server/routes_config_loader.go b/server/routes_config_loader.go index 9134600..44c51e2 100644 --- a/server/routes_config_loader.go +++ b/server/routes_config_loader.go @@ -44,7 +44,7 @@ func (r *routesConfigLoader) Load(routesConfigFileName string) error { } Routes.RegisterAll(config.Mappings) - Routes.SetDefaultRoute(config.DefaultServer, nil, nil, "") + Routes.SetDefaultRoute(config.DefaultServer, "", nil, nil, "") return nil } @@ -62,7 +62,7 @@ func (r *routesConfigLoader) Reload() error { logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file") Routes.Reset() Routes.RegisterAll(config.Mappings) - Routes.SetDefaultRoute(config.DefaultServer, nil, nil, "") + Routes.SetDefaultRoute(config.DefaultServer, "", nil, nil, "") return nil } @@ -135,7 +135,7 @@ func (r *routesConfigLoader) SaveRoutes() { return } - server, _, _ := Routes.GetDefaultRoute() + server, _, _, _ := Routes.GetDefaultRoute() err := r.writeFile(&RoutesConfigSchema{ DefaultServer: server, Mappings: Routes.GetMappings(), diff --git a/server/routes_test.go b/server/routes_test.go index 2e09f70..9c36194 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -3,8 +3,10 @@ package server import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_routesImpl_FindBackendForServerAddress(t *testing.T) { @@ -66,9 +68,9 @@ func Test_routesImpl_FindBackendForServerAddress(t *testing.T) { t.Run(tt.name, func(t *testing.T) { r := NewRoutes() - r.CreateMapping(tt.mapping.serverAddress, tt.mapping.backend, nil, nil, "") + r.CreateMapping(tt.mapping.serverAddress, tt.mapping.backend, "", nil, nil, "") - if got, server, _, _ := r.FindBackendForServerAddress(context.Background(), tt.args.serverAddress); got != tt.want { + if got, server, _, _, _ := r.FindBackendForServerAddress(context.Background(), tt.args.serverAddress); got != tt.want { t.Errorf("routesImpl.FindBackendForServerAddress() = %v, want %v", got, tt.want) } else { assert.Equal(t, tt.mapping.serverAddress, server) @@ -76,3 +78,68 @@ func Test_routesImpl_FindBackendForServerAddress(t *testing.T) { }) } } + +func Test_routesImpl_ScaleKey(t *testing.T) { + DownScaler = NewDownScaler(context.Background(), false, 1*time.Second) + + t.Run("scaleKey defaults to backend when empty", func(t *testing.T) { + r := NewRoutes() + r.CreateMapping("mc.example.com", "backend:25565", "", nil, nil, "") + + _, _, scaleKey, _, _ := r.FindBackendForServerAddress(context.Background(), "mc.example.com") + assert.Equal(t, "backend:25565", scaleKey) + }) + + t.Run("scaleKey is set when provided", func(t *testing.T) { + r := NewRoutes() + r.CreateMapping("mc.example.com", "proxy:25577", "10.0.0.5:25565", nil, nil, "") + + backend, _, scaleKey, _, _ := r.FindBackendForServerAddress(context.Background(), "mc.example.com") + assert.Equal(t, "proxy:25577", backend) + assert.Equal(t, "10.0.0.5:25565", scaleKey) + }) + + t.Run("GetSleepers matches on scaleKey not backend", func(t *testing.T) { + r := NewRoutes() + called := false + sleeper := func(ctx context.Context) error { + called = true + return nil + } + + // Two routes with same proxy backend but different scaleKeys + r.CreateMapping("mc1.example.com", "proxy:25577", "10.0.0.1:25565", nil, sleeper, "") + r.CreateMapping("mc2.example.com", "proxy:25577", "10.0.0.2:25565", nil, nil, "") + + sleepers := r.GetSleepers("10.0.0.1:25565") + require.Len(t, sleepers, 1) + _ = sleepers[0](context.Background()) + assert.True(t, called) + + // No sleeper for the second scaleKey since it has nil sleeper + sleepers = r.GetSleepers("10.0.0.2:25565") + assert.Empty(t, sleepers) + + // No sleeper when querying by proxy backend address + sleepers = r.GetSleepers("proxy:25577") + assert.Empty(t, sleepers) + }) + + t.Run("default route scaleKey", func(t *testing.T) { + r := NewRoutes() + r.SetDefaultRoute("proxy:25577", "10.0.0.5:25565", nil, nil, "") + + backend, scaleKey, _, _ := r.GetDefaultRoute() + assert.Equal(t, "proxy:25577", backend) + assert.Equal(t, "10.0.0.5:25565", scaleKey) + }) + + t.Run("default route scaleKey defaults to backend", func(t *testing.T) { + r := NewRoutes() + r.SetDefaultRoute("backend:25565", "", nil, nil, "") + + backend, scaleKey, _, _ := r.GetDefaultRoute() + assert.Equal(t, "backend:25565", backend) + assert.Equal(t, "backend:25565", scaleKey) + }) +} diff --git a/server/server.go b/server/server.go index f46eade..757c1ab 100644 --- a/server/server.go +++ b/server/server.go @@ -73,7 +73,7 @@ func NewServer(ctx context.Context, config *Config) (*Server, error) { Routes.RegisterAll(config.Mapping) if config.Default != "" { - Routes.SetDefaultRoute(config.Default, nil, nil, "") + Routes.SetDefaultRoute(config.Default, "", nil, nil, "") } if config.ConnectionRateLimit < 1 {