From af1e193958acc6df355f27fbf5f130c7773ae77e Mon Sep 17 00:00:00 2001 From: Geoff Bourne Date: Sun, 13 Jul 2025 10:00:40 -0500 Subject: [PATCH] Redo code cleanup (#429) * Code cleanup of routes config loader and API server (#424) (cherry picked from commit 1ee3eb4de381bb2b03c3468f167a7c681ea7d54c) * Refactored server setup and run out of main (#425) (cherry picked from commit 05c57c3b8517351d3d434184e3a0697c3bf2f330) * Code cleanup in and around connector (#427) (cherry picked from commit b3e88db48c875c2aa8d382d768c9c8208e3c2a22) * Update away from deprecated k8s NewInformer * Tidy up couple of k8s docs examples --- .github/workflows/release.yml | 2 +- .github/workflows/test.yml | 2 +- README.md | 2 +- cmd/mc-router/main.go | 266 ++++----------------------- docs/k8s-deployment.yaml | 23 ++- docs/k8s-mc-with-default.yaml | 31 ++-- go.mod | 10 +- go.sum | 7 + server/api_server.go | 87 ++++++++- server/client_filter.go | 2 +- server/configs.go | 50 +++++ server/connector.go | 181 ++++++++---------- server/connector_test.go | 4 +- server/k8s.go | 47 ++--- {cmd/mc-router => server}/metrics.go | 45 +++-- server/notifier.go | 31 ++++ server/routes.go | 90 +-------- server/routes_config.go | 260 -------------------------- server/routes_config_loader.go | 181 ++++++++++++++++++ server/server.go | 220 ++++++++++++++++++++++ skaffold.yaml | 8 +- 21 files changed, 789 insertions(+), 760 deletions(-) create mode 100644 server/configs.go rename {cmd/mc-router => server}/metrics.go (84%) delete mode 100644 server/routes_config.go create mode 100644 server/routes_config_loader.go create mode 100644 server/server.go diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f2aa455..016a8a1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,7 +10,7 @@ jobs: release: uses: itzg/github-workflows/.github/workflows/go-with-releaser-image.yml@main with: - go-version: "1.24.4" + go-version-file: 'go.mod' enable-ghcr: true secrets: image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ca25f32..3606806 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,4 +12,4 @@ jobs: build: uses: itzg/github-workflows/.github/workflows/go-test.yml@main with: - go-version: "1.24.4" + go-version-file: 'go.mod' diff --git a/README.md b/README.md index c991943..ef15ff2 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ The following shows a JSON file for routes config, where `default-server` can al } ``` -Sending a SIGHUP signal will cause mc-router to reload the routes config from disk. +Sending a SIGHUP signal will cause mc-router to reload the routes config from disk. The file can also be watched for changes by setting `-routes-config-watch` or the env variable `ROUTES_CONFIG_WATCH` to "true". ## Auto Scale Allow/Deny List diff --git a/cmd/mc-router/main.go b/cmd/mc-router/main.go index 3624e62..10c6fb9 100644 --- a/cmd/mc-router/main.go +++ b/cmd/mc-router/main.go @@ -3,82 +3,14 @@ package main import ( "context" "fmt" - "net" - "os" - "os/signal" - "runtime/pprof" - "strconv" - "syscall" - "time" - "github.com/itzg/go-flagsfiller" "github.com/itzg/mc-router/server" "github.com/sirupsen/logrus" + "os" + "os/signal" + "syscall" ) -type MetricsBackendConfig struct { - Influxdb struct { - Interval time.Duration `default:"1m"` - Tags map[string]string `usage:"any extra tags to be included with all reported metrics"` - Addr string - Username string - Password string - Database string - RetentionPolicy string - } -} - -type WebhookConfig struct { - Url string `usage:"If set, a POST request that contains connection status notifications will be sent to this HTTP address"` - RequireUser bool `default:"false" usage:"Indicates if the webhook will only be called if a user is connecting rather than just server list/ping"` -} - -type AutoScale struct { - Up bool `usage:"Increase Kubernetes StatefulSet Replicas (only) from 0 to 1 on respective backend servers when accessed"` - Down bool `default:"false" usage:"Decrease Kubernetes StatefulSet Replicas (only) from 1 to 0 on respective backend servers after there are no connections"` - DownAfter string `default:"10m" usage:"Server scale down delay after there are no connections"` - AllowDeny string `usage:"Path to config for server allowlists and denylists. If a global/server entry is specified, only players allowed to connect to the server will be able to trigger a scale up when -auto-scale-up is enabled or cancel active down scalers when -auto-scale-down is enabled"` -} - -type RoutesConfig struct { - Config string `usage:"Name or full [path] to routes config file"` - ConfigWatch bool `usage:"Watch for config file changes"` -} - -type Config struct { - Port int `default:"25565" usage:"The [port] bound to listen for Minecraft client connections"` - Default string `usage:"host:port of a default Minecraft server to use when mapping not found"` - Mapping map[string]string `usage:"Comma or newline delimited or repeated mappings of externalHostname=host:port"` - ApiBinding string `usage:"The [host:port] bound for servicing API requests"` - Version bool `usage:"Output version and exit"` - CpuProfile string `usage:"Enables CPU profiling and writes to given path"` - Debug bool `usage:"Enable debug logs"` - ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"` - InKubeCluster bool `usage:"Use in-cluster Kubernetes config"` - KubeConfig string `usage:"The path to a Kubernetes configuration file"` - InDocker bool `usage:"Use Docker service discovery"` - InDockerSwarm bool `usage:"Use Docker Swarm service discovery"` - DockerSocket string `default:"unix:///var/run/docker.sock" usage:"Path to Docker socket to use"` - DockerTimeout int `default:"0" usage:"Timeout configuration in seconds for the Docker integrations"` - DockerRefreshInterval int `default:"15" usage:"Refresh interval in seconds for the Docker integrations"` - MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb,prometheus"` - UseProxyProtocol bool `default:"false" usage:"Send PROXY protocol to backend servers"` - ReceiveProxyProtocol bool `default:"false" usage:"Receive PROXY protocol from backend servers, by default trusts every proxy header that it receives, combine with -trusted-proxies to specify a list of trusted proxies"` - TrustedProxies []string `usage:"Comma delimited list of CIDR notation IP blocks to trust when receiving PROXY protocol"` - RecordLogins bool `default:"false" usage:"Log and generate metrics on player logins. Metrics only supported with influxdb or prometheus backend"` - MetricsBackendConfig MetricsBackendConfig - Routes RoutesConfig - NgrokToken string `usage:"If set, an ngrok tunnel will be established. It is HIGHLY recommended to pass as an environment variable."` - AutoScale AutoScale - - ClientsToAllow []string `usage:"Zero or more client IP addresses or CIDRs to allow. Takes precedence over deny."` - ClientsToDeny []string `usage:"Zero or more client IP addresses or CIDRs to deny. Ignored if any configured to allow"` - - SimplifySRV bool `default:"false" usage:"Simplify fully qualified SRV records for mapping"` - - Webhook WebhookConfig `usage:"Webhook configuration"` -} - var ( version = "dev" commit = "none" @@ -89,190 +21,60 @@ func showVersion() { fmt.Printf("%v, commit %v, built at %v", version, commit, date) } +type CliConfig struct { + Version bool `usage:"Output version and exit"` + Debug bool `usage:"Enable debug logs"` + + ServerConfig server.Config `flatten:"true"` +} + func main() { - var config Config - err := flagsfiller.Parse(&config, flagsfiller.WithEnv("")) + var cliConfig CliConfig + err := flagsfiller.Parse(&cliConfig, flagsfiller.WithEnv("")) if err != nil { logrus.Fatal(err) } - if config.Version { + if cliConfig.Version { showVersion() os.Exit(0) } - if config.Debug { + if cliConfig.Debug { logrus.SetLevel(logrus.DebugLevel) logrus.Debug("Debug logs enabled") } - if config.CpuProfile != "" { - cpuProfileFile, err := os.Create(config.CpuProfile) - if err != nil { - logrus.WithError(err).Fatal("trying to create cpu profile file") - } - //goland:noinspection GoUnhandledErrorResult - defer cpuProfileFile.Close() - - logrus.WithField("file", config.CpuProfile).Info("Starting cpu profiling") - err = pprof.StartCPUProfile(cpuProfileFile) - if err != nil { - logrus.WithError(err).Fatal("trying to start cpu profile") - } - defer pprof.StopCPUProfile() - } - - var autoScaleAllowDenyConfig *server.AllowDenyConfig = nil - if config.AutoScale.AllowDeny != "" { - autoScaleAllowDenyConfig, err = server.ParseAllowDenyConfig(config.AutoScale.AllowDeny) - if err != nil { - logrus.WithError(err).Fatal("trying to parse autoscale up allow-deny-list file") - } - } - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - metricsBuilder := NewMetricsBuilder(config.MetricsBackend, &config.MetricsBackendConfig) + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - downScalerEnabled := config.AutoScale.Down && (config.InKubeCluster || config.KubeConfig != "") - downScalerDelay, err := time.ParseDuration(config.AutoScale.DownAfter) + s, err := server.NewServer(ctx, &cliConfig.ServerConfig) if err != nil { - logrus.WithError(err).Fatal("Unable to parse auto scale down after duration") - } - // Only one instance should be created - server.DownScaler = server.NewDownScaler(ctx, downScalerEnabled, downScalerDelay) - - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - - if config.Routes.Config != "" { - err := server.RoutesConfig.ReadRoutesConfig(config.Routes.Config) - if err != nil { - logrus.WithError(err).Fatal("Unable to load routes from config file") - } - - if config.Routes.ConfigWatch { - err := server.RoutesConfig.WatchForChanges(ctx) - if err != nil { - logrus.WithError(err).Fatal("Unable to watch for changes") - } - } + logrus.WithError(err).Fatal("Could not setup server") } - server.Routes.RegisterAll(config.Mapping) - if config.Default != "" { - server.Routes.SetDefaultRoute(config.Default) - } + go s.Run() - if config.ConnectionRateLimit < 1 { - config.ConnectionRateLimit = 1 - } - - trustedIpNets := make([]*net.IPNet, 0) - for _, ip := range config.TrustedProxies { - _, ipNet, err := net.ParseCIDR(ip) - if err != nil { - logrus.WithError(err).Fatal("Unable to parse trusted proxy CIDR block") - } - trustedIpNets = append(trustedIpNets, ipNet) - } - - connector := server.NewConnector(metricsBuilder.BuildConnectorMetrics(), config.UseProxyProtocol, config.ReceiveProxyProtocol, trustedIpNets, config.RecordLogins, autoScaleAllowDenyConfig) - - clientFilter, err := server.NewClientFilter(config.ClientsToAllow, config.ClientsToDeny) - if err != nil { - logrus.WithError(err).Fatal("Unable to create client filter") - } - connector.SetClientFilter(clientFilter) - - if config.Webhook.Url != "" { - logrus. - WithField("url", config.Webhook.Url). - WithField("require-user", config.Webhook.RequireUser). - Info("Using webhook for connection status notifications") - connector.SetConnectionNotifier( - server.NewWebhookNotifier(config.Webhook.Url, config.Webhook.RequireUser)) - } - - if config.NgrokToken != "" { - connector.UseNgrok(config.NgrokToken) - } - err = connector.StartAcceptingConnections(ctx, - net.JoinHostPort("", strconv.Itoa(config.Port)), - config.ConnectionRateLimit, - ) - if err != nil { - logrus.Fatal(err) - } - - if config.ApiBinding != "" { - server.StartApiServer(config.ApiBinding) - } - - if config.InKubeCluster { - err = server.K8sWatcher.StartInCluster(config.AutoScale.Up, config.AutoScale.Down) - if err != nil { - logrus.WithError(err).Fatal("Unable to start k8s integration") - } else { - defer server.K8sWatcher.Stop() - } - } else if config.KubeConfig != "" { - err := server.K8sWatcher.StartWithConfig(config.KubeConfig, config.AutoScale.Up, config.AutoScale.Down) - if err != nil { - logrus.WithError(err).Fatal("Unable to start k8s integration") - } else { - defer server.K8sWatcher.Stop() - } - } - - if config.InDocker { - err = server.DockerWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down) - if err != nil { - logrus.WithError(err).Fatal("Unable to start docker integration") - } else { - defer server.DockerWatcher.Stop() - } - } - - if config.InDockerSwarm { - err = server.DockerSwarmWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down) - if err != nil { - logrus.WithError(err).Fatal("Unable to start docker swarm integration") - } else { - defer server.DockerSwarmWatcher.Stop() - } - } - - server.Routes.SimplifySRV(config.SimplifySRV) - - err = metricsBuilder.Start(ctx) - if err != nil { - logrus.WithError(err).Fatal("Unable to start metrics reporter") - } - - // handle signals for { - sig := <-c - switch sig { - case syscall.SIGHUP: - if config.Routes.Config != "" { - logrus.Info("Received SIGHUP, reloading routes config...") - if err := server.RoutesConfig.ReloadRoutesConfig(); err != nil { - logrus. - WithError(err). - WithField("routesConfig", config.Routes.Config). - Error("Could not re-read the routes config file") - } - } - case syscall.SIGINT, syscall.SIGTERM: - logrus.WithField("signal", sig).Info("Stopping. Waiting for connections to complete...") - signal.Stop(c) - connector.WaitForConnections() - logrus.Info("Stopped") + select { + case <-s.Done(): return - default: - logrus.WithField("signal", sig).Warn("Received unexpected signal") + + case sig := <-signals: + switch sig { + case syscall.SIGHUP: + s.ReloadConfig() + + case syscall.SIGINT, syscall.SIGTERM: + cancel() + // but wait for the server to be done + + default: + logrus.WithField("signal", sig).Warn("Received unexpected signal") + } } } } diff --git a/docs/k8s-deployment.yaml b/docs/k8s-deployment.yaml index 19910b7..8a77534 100644 --- a/docs/k8s-deployment.yaml +++ b/docs/k8s-deployment.yaml @@ -34,22 +34,22 @@ apiVersion: apps/v1 kind: Deployment metadata: labels: - run: mc-router - name: mc-router + app: mc-router + name: mc-router-deployment spec: selector: matchLabels: - run: mc-router + app: mc-router strategy: type: Recreate template: metadata: labels: - run: mc-router + app: mc-router spec: serviceAccountName: mc-router containers: - - image: itzg/mc-router:latest + - image: itzg/mc-router name: mc-router # Add "--auto-scale-up" here for https://github.com/itzg/mc-router/#auto-scale-up args: @@ -68,3 +68,16 @@ spec: limits: memory: 100Mi cpu: "250m" +--- +apiVersion: v1 +kind: Service +metadata: + name: mc-router +spec: + selector: + app: mc-router + ports: + - protocol: TCP + port: 25565 + targetPort: proxy + type: NodePort diff --git a/docs/k8s-mc-with-default.yaml b/docs/k8s-mc-with-default.yaml index cbcc271..d616448 100644 --- a/docs/k8s-mc-with-default.yaml +++ b/docs/k8s-mc-with-default.yaml @@ -2,39 +2,38 @@ apiVersion: v1 kind: Service metadata: - name: mc-stable + name: mc-latest annotations: "mc-router.itzg.me/defaultServer": "true" spec: - type: ClusterIP + type: NodePort ports: - - port: 25566 - name: mc-router - targetPort: 25565 + - port: 25565 + name: minecraft selector: - run: mc-stable + app: mc-latest --- apiVersion: apps/v1 kind: Deployment metadata: labels: - run: mc-stable - name: mc-stable + app: mc-latest + name: mc-latest spec: selector: matchLabels: - run: mc-stable + app: mc-latest template: metadata: labels: - run: mc-stable + app: mc-latest spec: securityContext: runAsUser: 1000 fsGroup: 1000 containers: - image: itzg/minecraft-server - name: mc-stable + name: mc-latest env: - name: EULA value: "TRUE" @@ -48,25 +47,27 @@ metadata: annotations: "mc-router.itzg.me/externalServerName": "snapshot.your.domain" spec: + type: NodePort ports: - port: 25565 + name: minecraft selector: - run: mc-snapshot + app: mc-snapshot --- apiVersion: apps/v1 kind: Deployment metadata: labels: - run: mc-snapshot + app: mc-snapshot name: mc-snapshot spec: selector: matchLabels: - run: mc-snapshot + app: mc-snapshot template: metadata: labels: - run: mc-snapshot + app: mc-snapshot spec: securityContext: runAsUser: 1000 diff --git a/go.mod b/go.mod index 7c23f1b..2ee3be4 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,13 @@ module github.com/itzg/mc-router -go 1.24.0 - -toolchain go1.24.4 +go 1.24.4 require ( github.com/fsnotify/fsnotify v1.9.0 github.com/go-kit/kit v0.13.0 github.com/gorilla/mux v1.8.1 github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c - github.com/itzg/go-flagsfiller v1.15.2 + github.com/itzg/go-flagsfiller v1.16.0 github.com/juju/ratelimit v1.0.2 github.com/pires/go-proxyproto v0.8.1 github.com/pkg/errors v0.9.1 @@ -82,7 +80,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/VividCortex/gohistogram v1.0.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/docker/docker v28.3.0+incompatible + github.com/docker/docker v28.3.1+incompatible github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/go-kit/log v0.2.1 // indirect @@ -100,7 +98,7 @@ require ( github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/spf13/pflag v1.0.6 // indirect - golang.org/x/net v0.40.0 // indirect + golang.org/x/net v0.41.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/term v0.32.0 // indirect diff --git a/go.sum b/go.sum index 7e9031c..8b58941 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5Qvfr github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/docker v28.3.0+incompatible h1:ffS62aKWupCWdvcee7nBU9fhnmknOqDPaJAMtfK0ImQ= github.com/docker/docker v28.3.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v28.3.1+incompatible h1:20+BmuA9FXlCX4ByQ0vYJcUEnOmRM6XljDnFWR+jCyY= +github.com/docker/docker v28.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -89,6 +91,8 @@ github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSH github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/itzg/go-flagsfiller v1.15.2 h1:DvhhOKuqzawoa6C/3Q/y8pFbdO5PBdgnIvz84ZGkY0g= github.com/itzg/go-flagsfiller v1.15.2/go.mod h1:XmllPPi99O7vXTG9wa/Hzmhnkv6BXBF1W57ifbQTVs4= +github.com/itzg/go-flagsfiller v1.16.0 h1:YNwjLzFIeFzZpctT2RiN8T5qxiGrCX33bGSwtN6OSAA= +github.com/itzg/go-flagsfiller v1.16.0/go.mod h1:XmllPPi99O7vXTG9wa/Hzmhnkv6BXBF1W57ifbQTVs4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= @@ -205,6 +209,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -214,6 +219,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/server/api_server.go b/server/api_server.go index afd8ef5..a0f55b1 100644 --- a/server/api_server.go +++ b/server/api_server.go @@ -1,6 +1,7 @@ package server import ( + "encoding/json" "expvar" "net/http" @@ -9,11 +10,12 @@ import ( "github.com/sirupsen/logrus" ) -var apiRoutes = mux.NewRouter() - func StartApiServer(apiBinding string) { logrus.WithField("binding", apiBinding).Info("Serving API requests") + var apiRoutes = mux.NewRouter() + registerApiRoutes(apiRoutes) + apiRoutes.Path("/vars").Handler(expvar.Handler()) apiRoutes.Path("/metrics").Handler(promhttp.Handler()) @@ -23,3 +25,84 @@ func StartApiServer(apiBinding string) { http.ListenAndServe(apiBinding, apiRoutes)).Error("API server failed") }() } + +func registerApiRoutes(apiRoutes *mux.Router) { + apiRoutes.Path("/routes").Methods("GET"). + HandlerFunc(routesListHandler) + apiRoutes.Path("/routes").Methods("POST"). + HandlerFunc(routesCreateHandler) + apiRoutes.Path("/defaultRoute").Methods("POST"). + HandlerFunc(routesSetDefault) + apiRoutes.Path("/routes/{serverAddress}").Methods("DELETE").HandlerFunc(routesDeleteHandler) +} + +func routesListHandler(writer http.ResponseWriter, _ *http.Request) { + mappings := Routes.GetMappings() + bytes, err := json.Marshal(mappings) + if err != nil { + logrus.WithError(err).Error("Failed to marshal mappings") + writer.WriteHeader(http.StatusInternalServerError) + return + } + + writer.Header().Set("Content-Type", "application/json") + _, err = writer.Write(bytes) + if err != nil { + logrus.WithError(err).Error("Failed to write response") + } +} + +func routesDeleteHandler(writer http.ResponseWriter, request *http.Request) { + serverAddress := mux.Vars(request)["serverAddress"] + if serverAddress != "" { + if Routes.DeleteMapping(serverAddress) { + writer.WriteHeader(http.StatusOK) + } else { + writer.WriteHeader(http.StatusNotFound) + } + RoutesConfigLoader.SaveRoutes() + } +} + +func routesCreateHandler(writer http.ResponseWriter, request *http.Request) { + var definition = struct { + ServerAddress string + Backend string + }{} + + //goland:noinspection GoUnhandledErrorResult + defer request.Body.Close() + + decoder := json.NewDecoder(request.Body) + err := decoder.Decode(&definition) + if err != nil { + logrus.WithError(err).Error("Unable to get request body") + writer.WriteHeader(http.StatusBadRequest) + return + } + + Routes.CreateMapping(definition.ServerAddress, definition.Backend, EmptyScalerFunc, EmptyScalerFunc) + RoutesConfigLoader.SaveRoutes() + writer.WriteHeader(http.StatusCreated) +} + +func routesSetDefault(writer http.ResponseWriter, request *http.Request) { + var body = struct { + Backend string + }{} + + //goland:noinspection GoUnhandledErrorResult + defer request.Body.Close() + + decoder := json.NewDecoder(request.Body) + err := decoder.Decode(&body) + if err != nil { + logrus.WithError(err).Error("Unable to parse request") + writer.WriteHeader(http.StatusBadRequest) + return + } + + Routes.SetDefaultRoute(body.Backend) + RoutesConfigLoader.SaveRoutes() + writer.WriteHeader(http.StatusOK) +} diff --git a/server/client_filter.go b/server/client_filter.go index 33b7bf4..fdee7c0 100644 --- a/server/client_filter.go +++ b/server/client_filter.go @@ -89,7 +89,7 @@ func NewClientFilter(allows []string, denies []string) (*ClientFilter, error) { }, nil } -// Allow determines if the given address is allowed by this filter +// Allow determines if this filter allows the given address // where addrStr is a netip.ParseAddr allowed address func (f *ClientFilter) Allow(addrPort netip.AddrPort) bool { if !f.allow.Empty() { diff --git a/server/configs.go b/server/configs.go new file mode 100644 index 0000000..d34c467 --- /dev/null +++ b/server/configs.go @@ -0,0 +1,50 @@ +package server + +type WebhookConfig struct { + Url string `usage:"If set, a POST request that contains connection status notifications will be sent to this HTTP address"` + RequireUser bool `default:"false" usage:"Indicates if the webhook will only be called if a user is connecting rather than just server list/ping"` +} + +type AutoScale struct { + Up bool `usage:"Increase Kubernetes StatefulSet Replicas (only) from 0 to 1 on respective backend servers when accessed"` + Down bool `default:"false" usage:"Decrease Kubernetes StatefulSet Replicas (only) from 1 to 0 on respective backend servers after there are no connections"` + DownAfter string `default:"10m" usage:"Server scale down delay after there are no connections"` + AllowDeny string `usage:"Path to config for server allowlists and denylists. If a global/server entry is specified, only players allowed to connect to the server will be able to trigger a scale up when -auto-scale-up is enabled or cancel active down scalers when -auto-scale-down is enabled"` +} + +type RoutesConfig struct { + Config string `usage:"Name or full [path] to routes config file"` + ConfigWatch bool `usage:"Watch for config file changes"` +} + +type Config struct { + Port int `default:"25565" usage:"The [port] bound to listen for Minecraft client connections"` + Default string `usage:"host:port of a default Minecraft server to use when mapping not found"` + Mapping map[string]string `usage:"Comma or newline delimited or repeated mappings of externalHostname=host:port"` + ApiBinding string `usage:"The [host:port] bound for servicing API requests"` + CpuProfile string `usage:"Enables CPU profiling and writes to given path"` + ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"` + InKubeCluster bool `usage:"Use in-cluster Kubernetes config"` + KubeConfig string `usage:"The path to a Kubernetes configuration file"` + InDocker bool `usage:"Use Docker service discovery"` + InDockerSwarm bool `usage:"Use Docker Swarm service discovery"` + DockerSocket string `default:"unix:///var/run/docker.sock" usage:"Path to Docker socket to use"` + DockerTimeout int `default:"0" usage:"Timeout configuration in seconds for the Docker integrations"` + DockerRefreshInterval int `default:"15" usage:"Refresh interval in seconds for the Docker integrations"` + MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb,prometheus"` + MetricsBackendConfig MetricsBackendConfig + UseProxyProtocol bool `default:"false" usage:"Send PROXY protocol to backend servers"` + ReceiveProxyProtocol bool `default:"false" usage:"Receive PROXY protocol from backend servers, by default trusts every proxy header that it receives, combine with -trusted-proxies to specify a list of trusted proxies"` + TrustedProxies []string `usage:"Comma delimited list of CIDR notation IP blocks to trust when receiving PROXY protocol"` + RecordLogins bool `default:"false" usage:"Log and generate metrics on player logins. Metrics only supported with influxdb or prometheus backend"` + Routes RoutesConfig + NgrokToken string `usage:"If set, an ngrok tunnel will be established. It is HIGHLY recommended to pass as an environment variable."` + AutoScale AutoScale + + ClientsToAllow []string `usage:"Zero or more client IP addresses or CIDRs to allow. Takes precedence over deny."` + ClientsToDeny []string `usage:"Zero or more client IP addresses or CIDRs to deny. Ignored if any configured to allow"` + + SimplifySRV bool `default:"false" usage:"Simplify fully qualified SRV records for mapping"` + + Webhook WebhookConfig `usage:"Webhook configuration"` +} diff --git a/server/connector.go b/server/connector.go index 6af8d9f..3f8e4bc 100644 --- a/server/connector.go +++ b/server/connector.go @@ -12,12 +12,9 @@ import ( "sync/atomic" "time" - "github.com/google/uuid" - "golang.ngrok.com/ngrok" "golang.ngrok.com/ngrok/config" - "github.com/go-kit/kit/metrics" "github.com/itzg/mc-router/mcproto" "github.com/juju/ratelimit" "github.com/pires/go-proxyproto" @@ -30,58 +27,18 @@ const ( var noDeadline time.Time -type ConnectorMetrics struct { - Errors metrics.Counter - BytesTransmitted metrics.Counter - ConnectionsFrontend metrics.Counter - ConnectionsBackend metrics.Counter - ActiveConnections metrics.Gauge - ServerActivePlayer metrics.Gauge - ServerLogins metrics.Counter - ServerActiveConnections metrics.Gauge -} - -type ClientInfo struct { - Host string `json:"host"` - Port int `json:"port"` -} - -func ClientInfoFromAddr(addr net.Addr) *ClientInfo { - if addr == nil { - return nil - } - - return &ClientInfo{ - Host: addr.(*net.TCPAddr).IP.String(), - Port: addr.(*net.TCPAddr).Port, - } -} - -type PlayerInfo struct { - Name string `json:"name"` - Uuid uuid.UUID `json:"uuid"` -} - -func (p *PlayerInfo) String() string { - if p == nil { - return "" - } - - return fmt.Sprintf("%s/%s", p.Name, p.Uuid) -} - -type ServerMetrics struct { +type ActiveConnections struct { sync.RWMutex activeConnections map[string]int } -func NewServerMetrics() *ServerMetrics { - return &ServerMetrics{ +func NewActiveConnections() *ActiveConnections { + return &ActiveConnections{ activeConnections: make(map[string]int), } } -func (sm *ServerMetrics) IncrementActiveConnections(serverAddress string) { +func (sm *ActiveConnections) Increment(serverAddress string) { sm.Lock() defer sm.Unlock() if _, ok := sm.activeConnections[serverAddress]; !ok { @@ -91,7 +48,7 @@ func (sm *ServerMetrics) IncrementActiveConnections(serverAddress string) { sm.activeConnections[serverAddress] += 1 } -func (sm *ServerMetrics) DecrementActiveConnections(serverAddress string) { +func (sm *ActiveConnections) Decrement(serverAddress string) { sm.Lock() defer sm.Unlock() if activeConnections, ok := sm.activeConnections[serverAddress]; ok && activeConnections <= 0 { @@ -101,7 +58,7 @@ func (sm *ServerMetrics) DecrementActiveConnections(serverAddress string) { sm.activeConnections[serverAddress] -= 1 } -func (sm *ServerMetrics) ActiveConnectionsValue(serverAddress string) int { +func (sm *ActiveConnections) GetCount(serverAddress string) int { sm.Lock() defer sm.Unlock() if activeConnections, ok := sm.activeConnections[serverAddress]; ok { @@ -110,60 +67,58 @@ func (sm *ServerMetrics) ActiveConnectionsValue(serverAddress string) int { return 0 } -func NewConnector(metrics *ConnectorMetrics, sendProxyProto bool, receiveProxyProto bool, trustedProxyNets []*net.IPNet, recordLogins bool, autoScaleUpAllowDenyConfig *AllowDenyConfig) *Connector { +func NewConnector(ctx context.Context, metrics *ConnectorMetrics, sendProxyProto bool, recordLogins bool, autoScaleUpAllowDenyConfig *AllowDenyConfig) *Connector { return &Connector{ + ctx: ctx, metrics: metrics, sendProxyProto: sendProxyProto, connectionsCond: sync.NewCond(&sync.Mutex{}), - receiveProxyProto: receiveProxyProto, - trustedProxyNets: trustedProxyNets, recordLogins: recordLogins, autoScaleUpAllowDenyConfig: autoScaleUpAllowDenyConfig, - serverMetrics: NewServerMetrics(), + activeConnections: NewActiveConnections(), } } type Connector struct { - state mcproto.State - metrics *ConnectorMetrics - sendProxyProto bool - receiveProxyProto bool - recordLogins bool - trustedProxyNets []*net.IPNet - - activeConnections int32 - serverMetrics *ServerMetrics + ctx context.Context + state mcproto.State + metrics *ConnectorMetrics + sendProxyProto bool + receiveProxyProto bool + recordLogins bool + trustedProxyNets []*net.IPNet + totalActiveConnections int32 + activeConnections *ActiveConnections connectionsCond *sync.Cond ngrokToken string clientFilter *ClientFilter autoScaleUpAllowDenyConfig *AllowDenyConfig - - connectionNotifier ConnectionNotifier + connectionNotifier ConnectionNotifier } -func (c *Connector) SetConnectionNotifier(notifier ConnectionNotifier) { +func (c *Connector) UseConnectionNotifier(notifier ConnectionNotifier) { c.connectionNotifier = notifier } -func (c *Connector) SetClientFilter(filter *ClientFilter) { +func (c *Connector) UseClientFilter(filter *ClientFilter) { c.clientFilter = filter } -func (c *Connector) StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error { - ln, err := c.createListener(ctx, listenAddress) +func (c *Connector) StartAcceptingConnections(listenAddress string, connRateLimit int) error { + ln, err := c.createListener(listenAddress) if err != nil { return err } - go c.acceptConnections(ctx, ln, connRateLimit) + go c.acceptConnections(ln, connRateLimit) return nil } -func (c *Connector) createListener(ctx context.Context, listenAddress string) (net.Listener, error) { +func (c *Connector) createListener(listenAddress string) (net.Listener, error) { if c.ngrokToken != "" { - ngrokTun, err := ngrok.Listen(ctx, + ngrokTun, err := ngrok.Listen(c.ctx, config.TCPEndpoint(), ngrok.WithAuthtoken(c.ngrokToken), ) @@ -184,8 +139,8 @@ func (c *Connector) createListener(ctx context.Context, listenAddress string) (n if c.receiveProxyProto { proxyListener := &proxyproto.Listener{ - Listener: listener, - Policy: c.createProxyProtoPolicy(), + Listener: listener, + ConnPolicy: c.createProxyProtoPolicy(), } logrus.Info("Using PROXY protocol listener") return proxyListener, nil @@ -194,8 +149,8 @@ func (c *Connector) createListener(ctx context.Context, listenAddress string) (n return listener, nil } -func (c *Connector) createProxyProtoPolicy() func(upstream net.Addr) (proxyproto.Policy, error) { - return func(upstream net.Addr) (proxyproto.Policy, error) { +func (c *Connector) createProxyProtoPolicy() proxyproto.ConnPolicyFunc { + return func(connPolicyOptions proxyproto.ConnPolicyOptions) (proxyproto.Policy, error) { trustedIpNets := c.trustedProxyNets if len(trustedIpNets) == 0 { @@ -203,6 +158,7 @@ func (c *Connector) createProxyProtoPolicy() func(upstream net.Addr) (proxyproto return proxyproto.USE, nil } + upstream := connPolicyOptions.Upstream upstreamIP := upstream.(*net.TCPAddr).IP for _, ipNet := range trustedIpNets { if ipNet.Contains(upstreamIP) { @@ -221,17 +177,23 @@ func (c *Connector) WaitForConnections() { defer c.connectionsCond.L.Unlock() for { - count := atomic.LoadInt32(&c.activeConnections) + count := atomic.LoadInt32(&c.totalActiveConnections) if count > 0 { logrus.Infof("Waiting on %d connection(s)", count) c.connectionsCond.Wait() } else { - break + return } } } -func (c *Connector) acceptConnections(ctx context.Context, ln net.Listener, connRateLimit int) { +// AcceptConnection provides a way to externally supply a connection to consume. +// Note that this will skip rate limiting. +func (c *Connector) AcceptConnection(conn net.Conn) { + go c.HandleConnection(conn) +} + +func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) { //noinspection GoUnhandledErrorResult defer ln.Close() @@ -239,7 +201,7 @@ func (c *Connector) acceptConnections(ctx context.Context, ln net.Listener, conn for { select { - case <-ctx.Done(): + case <-c.ctx.Done(): return case <-time.After(bucket.Take(1)): @@ -247,13 +209,13 @@ func (c *Connector) acceptConnections(ctx context.Context, ln net.Listener, conn if err != nil { logrus.WithError(err).Error("Failed to accept connection") } else { - go c.HandleConnection(ctx, conn) + go c.HandleConnection(conn) } } } } -func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn) { +func (c *Connector) HandleConnection(frontendConn net.Conn) { c.metrics.ConnectionsFrontend.Add(1) //noinspection GoUnhandledErrorResult defer frontendConn.Close() @@ -343,7 +305,7 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn) Debug("Got user info") } - c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, handshake.ServerAddress, playerInfo, handshake.NextState) + c.findAndConnectBackend(frontendConn, clientAddr, inspectionBuffer, handshake.ServerAddress, playerInfo, handshake.NextState) } else if packet.PacketID == mcproto.PacketIdLegacyServerListPing { handshake, ok := packet.Data.(*mcproto.LegacyServerListPing) @@ -363,7 +325,7 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn) serverAddress := handshake.ServerAddress - c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, serverAddress, nil, mcproto.StateStatus) + c.findAndConnectBackend(frontendConn, clientAddr, inspectionBuffer, serverAddress, nil, mcproto.StateStatus) } else { logrus. WithField("client", clientAddr). @@ -394,9 +356,9 @@ func (c *Connector) readPlayerInfo(protocolVersion mcproto.ProtocolVersion, buff } } -func (c *Connector) cleanupBackendConnection(ctx context.Context, clientAddr net.Addr, serverAddress string, playerInfo *PlayerInfo, backendHostPort string, cleanupMetrics bool, checkScaleDown bool) { +func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress string, playerInfo *PlayerInfo, backendHostPort string, cleanupMetrics bool, checkScaleDown bool) { if c.connectionNotifier != nil { - err := c.connectionNotifier.NotifyDisconnected(ctx, clientAddr, serverAddress, playerInfo, backendHostPort) + err := c.connectionNotifier.NotifyDisconnected(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort) if err != nil { logrus.WithError(err).Warn("failed to notify disconnected") } @@ -404,12 +366,12 @@ func (c *Connector) cleanupBackendConnection(ctx context.Context, clientAddr net if cleanupMetrics { c.metrics.ActiveConnections.Set(float64( - atomic.AddInt32(&c.activeConnections, -1))) + atomic.AddInt32(&c.totalActiveConnections, -1))) - c.serverMetrics.DecrementActiveConnections(serverAddress) + c.activeConnections.Decrement(serverAddress) c.metrics.ServerActiveConnections. With("server_address", serverAddress). - Set(float64(c.serverMetrics.ActiveConnectionsValue(serverAddress))) + Set(float64(c.activeConnections.GetCount(serverAddress))) if c.recordLogins && playerInfo != nil { c.metrics.ServerActivePlayer. @@ -419,21 +381,21 @@ func (c *Connector) cleanupBackendConnection(ctx context.Context, clientAddr net Set(0) } } - if checkScaleDown && c.serverMetrics.ActiveConnectionsValue(serverAddress) <= 0 { + if checkScaleDown && c.activeConnections.GetCount(serverAddress) <= 0 { DownScaler.Begin(serverAddress) } c.connectionsCond.Signal() } -func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net.Conn, +func (c *Connector) findAndConnectBackend(frontendConn net.Conn, clientAddr net.Addr, preReadContent io.Reader, serverAddress string, playerInfo *PlayerInfo, nextState mcproto.State) { - backendHostPort, resolvedHost, waker, _ := Routes.FindBackendForServerAddress(ctx, serverAddress) + backendHostPort, resolvedHost, waker, _ := Routes.FindBackendForServerAddress(c.ctx, serverAddress) cleanupMetrics := false cleanupCheckScaleDown := false defer func() { - c.cleanupBackendConnection(ctx, clientAddr, serverAddress, playerInfo, backendHostPort, cleanupMetrics, cleanupCheckScaleDown) + c.cleanupBackendConnection(clientAddr, serverAddress, playerInfo, backendHostPort, cleanupMetrics, cleanupCheckScaleDown) }() if waker != nil && nextState > mcproto.StateStatus { @@ -448,7 +410,7 @@ func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net. // Cancel down scaler if active before scale up DownScaler.Cancel(serverAddress) cleanupCheckScaleDown = true - if err := waker(ctx); err != nil { + if err := waker(c.ctx); err != nil { logrus.WithFields(logrus.Fields{"serverAddress": serverAddress}).WithError(err).Error("failed to wake up backend") c.metrics.Errors.With("type", "wakeup_failed").Add(1) return @@ -465,7 +427,7 @@ func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net. c.metrics.Errors.With("type", "missing_backend").Add(1) if c.connectionNotifier != nil { - err := c.connectionNotifier.NotifyMissingBackend(ctx, clientAddr, serverAddress, playerInfo) + err := c.connectionNotifier.NotifyMissingBackend(c.ctx, clientAddr, serverAddress, playerInfo) if err != nil { logrus.WithError(err).Warn("failed to notify missing backend") } @@ -493,7 +455,7 @@ func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net. c.metrics.Errors.With("type", "backend_failed").Add(1) if c.connectionNotifier != nil { - notifyErr := c.connectionNotifier.NotifyFailedBackendConnection(ctx, clientAddr, serverAddress, playerInfo, backendHostPort, err) + notifyErr := c.connectionNotifier.NotifyFailedBackendConnection(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort, err) if notifyErr != nil { logrus.WithError(notifyErr).Warn("failed to notify failed backend connection") } @@ -503,7 +465,7 @@ func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net. } if c.connectionNotifier != nil { - err := c.connectionNotifier.NotifyConnected(ctx, clientAddr, serverAddress, playerInfo, backendHostPort) + err := c.connectionNotifier.NotifyConnected(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort) if err != nil { logrus.WithError(err).Warn("failed to notify connected") } @@ -512,12 +474,12 @@ func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net. c.metrics.ConnectionsBackend.With("host", resolvedHost).Add(1) c.metrics.ActiveConnections.Set(float64( - atomic.AddInt32(&c.activeConnections, 1))) + atomic.AddInt32(&c.totalActiveConnections, 1))) - c.serverMetrics.IncrementActiveConnections(serverAddress) + c.activeConnections.Increment(serverAddress) c.metrics.ServerActiveConnections. With("server_address", serverAddress). - Set(float64(c.serverMetrics.ActiveConnectionsValue(serverAddress))) + Set(float64(c.activeConnections.GetCount(serverAddress))) if c.recordLogins && playerInfo != nil { logrus. @@ -598,23 +560,23 @@ func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net. return } - c.pumpConnections(ctx, frontendConn, backendConn, playerInfo) + c.pumpConnections(frontendConn, backendConn, playerInfo) } -func (c *Connector) pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn, playerInfo *PlayerInfo) { +func (c *Connector) pumpConnections(frontendConn, backendConn net.Conn, playerInfo *PlayerInfo) { //noinspection GoUnhandledErrorResult defer backendConn.Close() clientAddr := frontendConn.RemoteAddr() defer logrus.WithField("client", clientAddr).Debug("Closing backend connection") - errors := make(chan error, 2) + errorsChan := make(chan error, 2) - go c.pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr, playerInfo) - go c.pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr, playerInfo) + go c.pumpFrames(backendConn, frontendConn, errorsChan, "backend", "frontend", clientAddr, playerInfo) + go c.pumpFrames(frontendConn, backendConn, errorsChan, "frontend", "backend", clientAddr, playerInfo) select { - case err := <-errors: + case err := <-errorsChan: if err != io.EOF { logrus.WithError(err). WithField("client", clientAddr). @@ -622,8 +584,8 @@ func (c *Connector) pumpConnections(ctx context.Context, frontendConn, backendCo c.metrics.Errors.With("type", "relay").Add(1) } - case <-ctx.Done(): - logrus.Debug("Observed context cancellation") + case <-c.ctx.Done(): + logrus.Debug("Connector observed context cancellation") } } @@ -649,3 +611,8 @@ func (c *Connector) pumpFrames(incoming io.Reader, outgoing io.Writer, errors ch func (c *Connector) UseNgrok(token string) { c.ngrokToken = token } + +func (c *Connector) UseReceiveProxyProto(trustedProxyNets []*net.IPNet) { + c.trustedProxyNets = trustedProxyNets + c.receiveProxyProto = true +} diff --git a/server/connector_test.go b/server/connector_test.go index 357aa44..ab26a53 100644 --- a/server/connector_test.go +++ b/server/connector_test.go @@ -61,7 +61,9 @@ func TestTrustedProxyNetworkPolicy(t *testing.T) { policy := c.createProxyProtoPolicy() upstreamAddr := &net.TCPAddr{IP: net.ParseIP(test.upstreamIP)} - policyResult, _ := policy(upstreamAddr) + policyResult, _ := policy(proxyproto.ConnPolicyOptions{ + Upstream: upstreamAddr, + }) assert.Equal(t, test.expectedPolicy, policyResult, "Unexpected policy result for %s", test.name) }) } diff --git a/server/k8s.go b/server/k8s.go index 75a554e..99791b7 100644 --- a/server/k8s.go +++ b/server/k8s.go @@ -3,11 +3,6 @@ package server import ( "context" "fmt" - "net" - "strconv" - "strings" - "sync" - "github.com/pkg/errors" "github.com/sirupsen/logrus" apps "k8s.io/api/apps/v1" @@ -19,6 +14,10 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "net" + "strconv" + "strings" + "sync" ) const ( @@ -27,9 +26,8 @@ const ( ) type IK8sWatcher interface { - StartWithConfig(kubeConfigFile string, autoScaleUp bool, autoScaleDown bool) error - StartInCluster(autoScaleUp bool, autoScaleDown bool) error - Stop() + StartWithConfig(ctx context.Context, kubeConfigFile string, autoScaleUp bool, autoScaleDown bool) error + StartInCluster(ctx context.Context, autoScaleUp bool, autoScaleDown bool) error } var K8sWatcher IK8sWatcher = &k8sWatcherImpl{} @@ -42,29 +40,27 @@ type k8sWatcherImpl struct { mappings map[string]string clientset *kubernetes.Clientset - stop chan struct{} } -func (w *k8sWatcherImpl) StartInCluster(autoScaleUp bool, autoScaleDown bool) error { +func (w *k8sWatcherImpl) StartInCluster(ctx context.Context, autoScaleUp bool, autoScaleDown bool) error { config, err := rest.InClusterConfig() if err != nil { return errors.Wrap(err, "Unable to load in-cluster config") } - return w.startWithLoadedConfig(config, autoScaleUp, autoScaleDown) + return w.startWithLoadedConfig(ctx, config, autoScaleUp, autoScaleDown) } -func (w *k8sWatcherImpl) StartWithConfig(kubeConfigFile string, autoScaleUp bool, autoScaleDown bool) error { +func (w *k8sWatcherImpl) StartWithConfig(ctx context.Context, kubeConfigFile string, autoScaleUp bool, autoScaleDown bool) error { config, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile) if err != nil { return errors.Wrap(err, "Could not load kube config file") } - return w.startWithLoadedConfig(config, autoScaleUp, autoScaleDown) + return w.startWithLoadedConfig(ctx, config, autoScaleUp, autoScaleDown) } -func (w *k8sWatcherImpl) startWithLoadedConfig(config *rest.Config, autoScaleUp bool, autoScaleDown bool) error { - w.stop = make(chan struct{}, 1) +func (w *k8sWatcherImpl) startWithLoadedConfig(ctx context.Context, config *rest.Config, autoScaleUp bool, autoScaleDown bool) error { w.autoScaleUp = autoScaleUp w.autoScaleDown = autoScaleDown @@ -74,22 +70,21 @@ func (w *k8sWatcherImpl) startWithLoadedConfig(config *rest.Config, autoScaleUp } w.clientset = clientset - _, serviceController := cache.NewInformer( - cache.NewListWatchFromClient( + _, serviceController := cache.NewInformerWithOptions(cache.InformerOptions{ + ListerWatcher: cache.NewListWatchFromClient( clientset.CoreV1().RESTClient(), string(core.ResourceServices), core.NamespaceAll, fields.Everything(), ), - &core.Service{}, - 0, - cache.ResourceEventHandlerFuncs{ + ObjectType: &core.Service{}, + Handler: cache.ResourceEventHandlerFuncs{ AddFunc: w.handleAdd, DeleteFunc: w.handleDelete, UpdateFunc: w.handleUpdate, }, - ) - go serviceController.Run(w.stop) + }) + go serviceController.RunWithContext(ctx) w.mappings = make(map[string]string) if autoScaleUp || autoScaleDown { @@ -137,7 +132,7 @@ func (w *k8sWatcherImpl) startWithLoadedConfig(config *rest.Config, autoScaleUp }, }, ) - go statefulSetController.Run(w.stop) + go statefulSetController.RunWithContext(ctx) } logrus.Info("Monitoring Kubernetes for Minecraft services") @@ -199,12 +194,6 @@ func (w *k8sWatcherImpl) handleAdd(obj interface{}) { } } -func (w *k8sWatcherImpl) Stop() { - if w.stop != nil { - close(w.stop) - } -} - type routableService struct { externalServiceName string containerEndpoint string diff --git a/cmd/mc-router/metrics.go b/server/metrics.go similarity index 84% rename from cmd/mc-router/metrics.go rename to server/metrics.go index dbb5936..d704e49 100644 --- a/cmd/mc-router/metrics.go +++ b/server/metrics.go @@ -1,9 +1,10 @@ -package main +package server import ( "context" "errors" "fmt" + "github.com/go-kit/kit/metrics" "strings" "time" @@ -13,14 +14,13 @@ import ( kitinflux "github.com/go-kit/kit/metrics/influx" prometheusMetrics "github.com/go-kit/kit/metrics/prometheus" influx "github.com/influxdata/influxdb1-client/v2" - "github.com/itzg/mc-router/server" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" ) type MetricsBuilder interface { - BuildConnectorMetrics() *server.ConnectorMetrics + BuildConnectorMetrics() *ConnectorMetrics Start(ctx context.Context) error } @@ -31,6 +31,18 @@ const ( MetricsBackendDiscard = "discard" ) +type MetricsBackendConfig struct { + Influxdb struct { + Interval time.Duration `default:"1m"` + Tags map[string]string `usage:"any extra tags to be included with all reported metrics"` + Addr string + Username string + Password string + Database string + RetentionPolicy string + } +} + // NewMetricsBuilder creates a new MetricsBuilder based on the specified backend. // If the backend is not recognized, a discard builder is returned. // config can be nil if the backend is not influxdb. @@ -57,9 +69,20 @@ func (b expvarMetricsBuilder) Start(ctx context.Context) error { return nil } -func (b expvarMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { +type ConnectorMetrics struct { + Errors metrics.Counter + BytesTransmitted metrics.Counter + ConnectionsFrontend metrics.Counter + ConnectionsBackend metrics.Counter + ActiveConnections metrics.Gauge + ServerActivePlayer metrics.Gauge + ServerLogins metrics.Counter + ServerActiveConnections metrics.Gauge +} + +func (b expvarMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { c := expvarMetrics.NewCounter("connections") - return &server.ConnectorMetrics{ + return &ConnectorMetrics{ Errors: expvarMetrics.NewCounter("errors").With("subsystem", "connector"), BytesTransmitted: expvarMetrics.NewCounter("bytes"), ConnectionsFrontend: c, @@ -79,8 +102,8 @@ func (b discardMetricsBuilder) Start(ctx context.Context) error { return nil } -func (b discardMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { - return &server.ConnectorMetrics{ +func (b discardMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { + return &ConnectorMetrics{ Errors: discardMetrics.NewCounter(), BytesTransmitted: discardMetrics.NewCounter(), ConnectionsFrontend: discardMetrics.NewCounter(), @@ -121,7 +144,7 @@ func (b *influxMetricsBuilder) Start(ctx context.Context) error { return nil } -func (b *influxMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { +func (b *influxMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { influxConfig := &b.config.Influxdb metrics := kitinflux.New(influxConfig.Tags, influx.BatchPointsConfig{ @@ -132,7 +155,7 @@ func (b *influxMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics b.metrics = metrics c := metrics.NewCounter("mc_router_connections") - return &server.ConnectorMetrics{ + return &ConnectorMetrics{ Errors: metrics.NewCounter("mc_router_errors"), BytesTransmitted: metrics.NewCounter("mc_router_transmitted_bytes"), ConnectionsFrontend: c.With("side", "frontend"), @@ -155,13 +178,13 @@ func (b prometheusMetricsBuilder) Start(ctx context.Context) error { return nil } -func (b prometheusMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { +func (b prometheusMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { pcv = prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "mc_router", Name: "errors", Help: "The total number of errors", }, []string{"type"})) - return &server.ConnectorMetrics{ + return &ConnectorMetrics{ Errors: pcv, BytesTransmitted: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "mc_router", diff --git a/server/notifier.go b/server/notifier.go index ad17c51..7d322a3 100644 --- a/server/notifier.go +++ b/server/notifier.go @@ -2,9 +2,40 @@ package server import ( "context" + "fmt" + "github.com/google/uuid" "net" ) +type PlayerInfo struct { + Name string `json:"name"` + Uuid uuid.UUID `json:"uuid"` +} + +func (p *PlayerInfo) String() string { + if p == nil { + return "" + } + + return fmt.Sprintf("%s/%s", p.Name, p.Uuid) +} + +type ClientInfo struct { + Host string `json:"host"` + Port int `json:"port"` +} + +func ClientInfoFromAddr(addr net.Addr) *ClientInfo { + if addr == nil { + return nil + } + + return &ClientInfo{ + Host: addr.(*net.TCPAddr).IP.String(), + Port: addr.(*net.TCPAddr).Port, + } +} + type ConnectionNotifier interface { // NotifyMissingBackend is called when an inbound connection is received for a server that does not have a backend. NotifyMissingBackend(ctx context.Context, clientAddr net.Addr, server string, playerInfo *PlayerInfo) error diff --git a/server/routes.go b/server/routes.go index f69425b..7c2f6d7 100644 --- a/server/routes.go +++ b/server/routes.go @@ -2,13 +2,10 @@ package server import ( "context" - "encoding/json" - "net/http" "regexp" "strings" "sync" - "github.com/gorilla/mux" "github.com/sirupsen/logrus" ) @@ -18,88 +15,6 @@ var EmptyScalerFunc = func(ctx context.Context) error { return nil } var tcpShieldPattern = regexp.MustCompile("///.*") -func init() { - apiRoutes.Path("/routes").Methods("GET"). - Headers("Accept", "application/json"). - HandlerFunc(routesListHandler) - apiRoutes.Path("/routes").Methods("POST"). - Headers("Content-Type", "application/json"). - HandlerFunc(routesCreateHandler) - apiRoutes.Path("/defaultRoute").Methods("POST"). - Headers("Content-Type", "application/json"). - HandlerFunc(routesSetDefault) - apiRoutes.Path("/routes/{serverAddress}").Methods("DELETE").HandlerFunc(routesDeleteHandler) -} - -func routesListHandler(writer http.ResponseWriter, _ *http.Request) { - mappings := Routes.GetMappings() - bytes, err := json.Marshal(mappings) - if err != nil { - logrus.WithError(err).Error("Failed to marshal mappings") - writer.WriteHeader(http.StatusInternalServerError) - return - } - _, err = writer.Write(bytes) - if err != nil { - logrus.WithError(err).Error("Failed to write response") - } -} - -func routesDeleteHandler(writer http.ResponseWriter, request *http.Request) { - serverAddress := mux.Vars(request)["serverAddress"] - RoutesConfig.DeleteMapping(serverAddress) - if serverAddress != "" { - if Routes.DeleteMapping(serverAddress) { - writer.WriteHeader(http.StatusOK) - } else { - writer.WriteHeader(http.StatusNotFound) - } - } -} - -func routesCreateHandler(writer http.ResponseWriter, request *http.Request) { - var definition = struct { - ServerAddress string - Backend string - }{} - - //goland:noinspection GoUnhandledErrorResult - defer request.Body.Close() - - decoder := json.NewDecoder(request.Body) - err := decoder.Decode(&definition) - if err != nil { - logrus.WithError(err).Error("Unable to get request body") - writer.WriteHeader(http.StatusBadRequest) - return - } - - Routes.CreateMapping(definition.ServerAddress, definition.Backend, EmptyScalerFunc, EmptyScalerFunc) - RoutesConfig.AddMapping(definition.ServerAddress, definition.Backend) - writer.WriteHeader(http.StatusCreated) -} - -func routesSetDefault(writer http.ResponseWriter, request *http.Request) { - var body = struct { - Backend string - }{} - - //goland:noinspection GoUnhandledErrorResult - defer request.Body.Close() - - decoder := json.NewDecoder(request.Body) - err := decoder.Decode(&body) - if err != nil { - logrus.WithError(err).Error("Unable to parse request") - writer.WriteHeader(http.StatusBadRequest) - return - } - - Routes.SetDefaultRoute(body.Backend) - RoutesConfig.SetDefaultRoute(body.Backend) - writer.WriteHeader(http.StatusOK) -} - type IRoutes interface { Reset() RegisterAll(mappings map[string]string) @@ -112,6 +27,7 @@ type IRoutes interface { DeleteMapping(serverAddress string) bool CreateMapping(serverAddress string, backend string, waker ScalerFunc, sleeper ScalerFunc) SetDefaultRoute(backend string) + GetDefaultRoute() string SimplifySRV(srvEnabled bool) } @@ -157,6 +73,10 @@ func (r *routesImpl) SetDefaultRoute(backend string) { }).Info("Using default route") } +func (r *routesImpl) GetDefaultRoute() string { + return r.defaultRoute +} + func (r *routesImpl) SimplifySRV(srvEnabled bool) { r.simplifySRV = srvEnabled } diff --git a/server/routes_config.go b/server/routes_config.go deleted file mode 100644 index ac074a5..0000000 --- a/server/routes_config.go +++ /dev/null @@ -1,260 +0,0 @@ -package server - -import ( - "context" - "encoding/json" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - - "io/fs" - "os" - "sync" -) - -type IRoutesConfig interface { - ReadRoutesConfig(routesConfig string) - ReloadRoutesConfig() - AddMapping(serverAddress string, backend string) - DeleteMapping(serverAddress string) - SetDefaultRoute(backend string) - WatchForChanges(ctx context.Context) error -} - -const debounceConfigRereadDuration = time.Second * 5 - -var RoutesConfig = &routesConfigImpl{} - -type routesConfigImpl struct { - sync.RWMutex - fileName string -} - -type routesConfigStructure struct { - DefaultServer string `json:"default-server"` - Mappings map[string]string `json:"mappings"` -} - -func (r *routesConfigImpl) ReadRoutesConfig(routesConfig string) error { - r.fileName = routesConfig - - logrus.WithField("routesConfig", r.fileName).Info("Loading routes config file") - - config, readErr := r.readRoutesConfigFile() - - if readErr != nil { - if errors.Is(readErr, fs.ErrNotExist) { - logrus.WithField("routesConfig", r.fileName).Info("Routes config file doses not exist, skipping reading it") - // File doesn't exist -> ignore it - return nil - } - return errors.Wrap(readErr, "Could not load the routes config file") - } - - Routes.RegisterAll(config.Mappings) - Routes.SetDefaultRoute(config.DefaultServer) - return nil -} - -func (r *routesConfigImpl) ReloadRoutesConfig() error { - config, readErr := r.readRoutesConfigFile() - - if readErr != nil { - return readErr - } - - logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file") - Routes.Reset() - Routes.RegisterAll(config.Mappings) - Routes.SetDefaultRoute(config.DefaultServer) - - return nil -} - -func (r *routesConfigImpl) WatchForChanges(ctx context.Context) error { - if r.fileName == "" { - return errors.New("routes config file needs to be specified first") - } - - watcher, err := fsnotify.NewWatcher() - if err != nil { - return errors.Wrap(err, "Could not create a watcher") - } - - err = watcher.Add(r.fileName) - if err != nil { - return errors.Wrap(err, "Could not watch the routes config file") - } - - go func() { - logrus.WithField("file", r.fileName).Info("Watching routes config file") - - debounceTimerChan := make(<-chan time.Time) - var debounceTimer *time.Timer - - //goland:noinspection GoUnhandledErrorResult - defer watcher.Close() - for { - select { - - case event, ok := <-watcher.Events: - if !ok { - logrus.Debug("Watcher events channel closed") - return - } - logrus. - WithField("file", event.Name). - WithField("op", event.Op). - Trace("fs event received") - if event.Op.Has(fsnotify.Write) || event.Op.Has(fsnotify.Create) { - if debounceTimer == nil { - debounceTimer = time.NewTimer(debounceConfigRereadDuration) - } else { - debounceTimer.Reset(debounceConfigRereadDuration) - } - debounceTimerChan = debounceTimer.C - logrus.WithField("delay", debounceConfigRereadDuration).Debug("Will re-read config file after delay") - } - - case <-debounceTimerChan: - readErr := r.ReadRoutesConfig(r.fileName) - if readErr != nil { - logrus. - WithError(readErr). - WithField("routesConfig", r.fileName). - Error("Could not re-read the routes config file") - } - - case <-ctx.Done(): - return - } - } - }() - - return nil -} - -func (r *routesConfigImpl) AddMapping(serverAddress string, backend string) { - if !r.isRoutesConfigEnabled() { - return - } - - config, readErr := r.readRoutesConfigFile() - if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) { - logrus.WithError(readErr).Error("Could not read the routes config file") - return - } - if config.Mappings == nil { - config.Mappings = make(map[string]string) - } - - config.Mappings[serverAddress] = backend - - writeErr := r.writeRoutesConfigFile(config) - if writeErr != nil { - logrus.WithError(writeErr).Error("Could not write to the routes config file") - return - } - - logrus.WithFields(logrus.Fields{ - "serverAddress": serverAddress, - "backend": backend, - }).Info("Added route to routes config") - - return -} - -func (r *routesConfigImpl) SetDefaultRoute(backend string) { - if !r.isRoutesConfigEnabled() { - return - } - - config, readErr := r.readRoutesConfigFile() - if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) { - logrus.WithError(readErr).Error("Could not read the routes config file") - return - } - - config.DefaultServer = backend - - writeErr := r.writeRoutesConfigFile(config) - if writeErr != nil { - logrus.WithError(writeErr).Error("Could not write to the routes config file") - return - } - - logrus.WithFields(logrus.Fields{ - "backend": backend, - }).Info("Set default route in routes config") - - return -} - -func (r *routesConfigImpl) DeleteMapping(serverAddress string) { - if !r.isRoutesConfigEnabled() { - return - } - - config, readErr := r.readRoutesConfigFile() - if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) { - logrus.WithError(readErr).Error("Could not read the routes config file") - return - } - - delete(config.Mappings, serverAddress) - - writeErr := r.writeRoutesConfigFile(config) - if writeErr != nil { - logrus.WithError(writeErr).Error("Could not write to the routes config file") - return - } - - logrus.WithField("serverAddress", serverAddress).Info("Deleted route in routes config") - - return -} - -func (r *routesConfigImpl) isRoutesConfigEnabled() bool { - return r.fileName != "" -} - -func (r *routesConfigImpl) readRoutesConfigFile() (routesConfigStructure, error) { - r.RLock() - defer r.RUnlock() - - config := routesConfigStructure{ - "", - make(map[string]string), - } - - file, fileErr := os.ReadFile(r.fileName) - if fileErr != nil { - return config, errors.Wrap(fileErr, "Could not load the routes config file") - } - - parseErr := json.Unmarshal(file, &config) - if parseErr != nil { - return config, errors.Wrap(parseErr, "Could not parse the json routes config file") - } - - return config, nil -} - -func (r *routesConfigImpl) writeRoutesConfigFile(config routesConfigStructure) error { - r.Lock() - defer r.Unlock() - - newFileContent, parseErr := json.Marshal(config) - if parseErr != nil { - return errors.Wrap(parseErr, "Could not parse the routes to json") - } - - fileErr := os.WriteFile(r.fileName, newFileContent, 0664) - if fileErr != nil { - return errors.Wrap(fileErr, "Could not write to the routes config file") - } - - return nil -} diff --git a/server/routes_config_loader.go b/server/routes_config_loader.go new file mode 100644 index 0000000..43a9aeb --- /dev/null +++ b/server/routes_config_loader.go @@ -0,0 +1,181 @@ +package server + +import ( + "context" + "encoding/json" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "io/fs" + "os" +) + +const debounceConfigRereadDuration = time.Second * 5 + +var RoutesConfigLoader = &routesConfigLoader{} + +type routesConfigLoader struct { + fileName string +} + +// RoutesConfigSchema declares the schema of the json file that can provide routes to serve +type RoutesConfigSchema struct { + DefaultServer string `json:"default-server"` + Mappings map[string]string `json:"mappings"` +} + +func (r *routesConfigLoader) Load(routesConfigFileName string) error { + r.fileName = routesConfigFileName + + logrus.WithField("routesConfigFileName", r.fileName).Info("Loading routes config file") + + config, readErr := r.readFile() + + if readErr != nil { + if errors.Is(readErr, fs.ErrNotExist) { + logrus.WithField("routesConfigFileName", r.fileName).Info("Routes config file doses not exist, skipping reading it") + // File doesn't exist -> ignore it + return nil + } + return errors.Wrap(readErr, "Could not load the routes config file") + } + + Routes.RegisterAll(config.Mappings) + Routes.SetDefaultRoute(config.DefaultServer) + return nil +} + +func (r *routesConfigLoader) Reload() error { + if !r.isEnabled() { + return nil + } + + config, readErr := r.readFile() + + if readErr != nil { + return readErr + } + + logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file") + Routes.Reset() + Routes.RegisterAll(config.Mappings) + Routes.SetDefaultRoute(config.DefaultServer) + + return nil +} + +func (r *routesConfigLoader) WatchForChanges(ctx context.Context) error { + if r.fileName == "" { + return errors.New("routes config file needs to be specified first") + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return errors.Wrap(err, "Could not create a watcher") + } + + err = watcher.Add(r.fileName) + if err != nil { + return errors.Wrap(err, "Could not watch the routes config file") + } + + go func() { + logrus.WithField("file", r.fileName).Info("Watching routes config file") + + debounceTimerChan := make(<-chan time.Time) + var debounceTimer *time.Timer + + //goland:noinspection GoUnhandledErrorResult + defer watcher.Close() + for { + select { + + case event, ok := <-watcher.Events: + if !ok { + logrus.Debug("Watcher events channel closed") + return + } + logrus. + WithField("file", event.Name). + WithField("op", event.Op). + Trace("fs event received") + if event.Op.Has(fsnotify.Write) || event.Op.Has(fsnotify.Create) { + if debounceTimer == nil { + debounceTimer = time.NewTimer(debounceConfigRereadDuration) + } else { + debounceTimer.Reset(debounceConfigRereadDuration) + } + debounceTimerChan = debounceTimer.C + logrus.WithField("delay", debounceConfigRereadDuration).Debug("Will re-read config file after delay") + } + + case <-debounceTimerChan: + readErr := r.Load(r.fileName) + if readErr != nil { + logrus. + WithError(readErr). + WithField("routesConfig", r.fileName). + Error("Could not re-read the routes config file") + } + + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +func (r *routesConfigLoader) SaveRoutes() { + if !r.isEnabled() { + return + } + + err := r.writeFile(&RoutesConfigSchema{ + DefaultServer: Routes.GetDefaultRoute(), + Mappings: Routes.GetMappings(), + }) + if err != nil { + logrus.WithError(err).Error("Could not save the routes config file") + return + } + logrus.Info("Saved routes config") +} + +func (r *routesConfigLoader) isEnabled() bool { + return r.fileName != "" +} + +func (r *routesConfigLoader) readFile() (*RoutesConfigSchema, error) { + var config RoutesConfigSchema + + content, err := os.ReadFile(r.fileName) + if err != nil { + return &config, errors.Wrap(err, "Could not load the routes config file") + } + + parseErr := json.Unmarshal(content, &config) + if parseErr != nil { + return &config, errors.Wrap(parseErr, "Could not parse the json routes config file") + } + + return &config, nil +} + +func (r *routesConfigLoader) writeFile(config *RoutesConfigSchema) error { + newFileContent, err := json.Marshal(config) + if err != nil { + return errors.Wrap(err, "Could not parse the routes to json") + } + + err = os.WriteFile(r.fileName, newFileContent, 0664) + if err != nil { + return errors.Wrap(err, "Could not write to the routes config file") + } + + return nil +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..0cb150c --- /dev/null +++ b/server/server.go @@ -0,0 +1,220 @@ +package server + +import ( + "context" + "fmt" + "github.com/sirupsen/logrus" + "net" + "os" + "runtime/pprof" + "strconv" + "time" +) + +type Server struct { + ctx context.Context + config *Config + connector *Connector + reloadConfigChan chan struct{} + doneChan chan struct{} +} + +func NewServer(ctx context.Context, config *Config) (*Server, error) { + if config.CpuProfile != "" { + cpuProfileFile, err := os.Create(config.CpuProfile) + if err != nil { + return nil, fmt.Errorf("could not create cpu profile file: %w", err) + } + //goland:noinspection GoUnhandledErrorResult + defer cpuProfileFile.Close() + + logrus.WithField("file", config.CpuProfile).Info("Starting cpu profiling") + err = pprof.StartCPUProfile(cpuProfileFile) + if err != nil { + return nil, fmt.Errorf("could not start cpu profile: %w", err) + } + defer pprof.StopCPUProfile() + } + + var err error + + var autoScaleAllowDenyConfig *AllowDenyConfig = nil + if config.AutoScale.AllowDeny != "" { + autoScaleAllowDenyConfig, err = ParseAllowDenyConfig(config.AutoScale.AllowDeny) + if err != nil { + return nil, fmt.Errorf("could not parse autoscale allow-deny-list: %w", err) + } + } + + metricsBuilder := NewMetricsBuilder(config.MetricsBackend, &config.MetricsBackendConfig) + + downScalerEnabled := config.AutoScale.Down && (config.InKubeCluster || config.KubeConfig != "") + downScalerDelay, err := time.ParseDuration(config.AutoScale.DownAfter) + if err != nil { + return nil, fmt.Errorf("could not parse auto-scale-down-after duration: %w", err) + } + // Only one instance should be created + DownScaler = NewDownScaler(ctx, downScalerEnabled, downScalerDelay) + + if config.Routes.Config != "" { + err := RoutesConfigLoader.Load(config.Routes.Config) + if err != nil { + return nil, fmt.Errorf("could not load routes config file: %w", err) + } + + if config.Routes.ConfigWatch { + err := RoutesConfigLoader.WatchForChanges(ctx) + if err != nil { + return nil, fmt.Errorf("could not watch for changes to routes config file: %w", err) + } + } + } + + Routes.RegisterAll(config.Mapping) + if config.Default != "" { + Routes.SetDefaultRoute(config.Default) + } + + if config.ConnectionRateLimit < 1 { + config.ConnectionRateLimit = 1 + } + + connector := NewConnector(ctx, + metricsBuilder.BuildConnectorMetrics(), + config.UseProxyProtocol, + config.RecordLogins, + autoScaleAllowDenyConfig) + + clientFilter, err := NewClientFilter(config.ClientsToAllow, config.ClientsToDeny) + if err != nil { + return nil, fmt.Errorf("could not create client filter: %w", err) + } + connector.UseClientFilter(clientFilter) + + if config.Webhook.Url != "" { + logrus.WithField("url", config.Webhook.Url). + WithField("require-user", config.Webhook.RequireUser). + Info("Using webhook for connection status notifications") + connector.UseConnectionNotifier( + NewWebhookNotifier(config.Webhook.Url, config.Webhook.RequireUser)) + } + + if config.NgrokToken != "" { + connector.UseNgrok(config.NgrokToken) + } + + if config.ReceiveProxyProtocol { + trustedIpNets := make([]*net.IPNet, 0) + for _, ip := range config.TrustedProxies { + _, ipNet, err := net.ParseCIDR(ip) + if err != nil { + return nil, fmt.Errorf("could not parse trusted proxy CIDR block: %w", err) + } + trustedIpNets = append(trustedIpNets, ipNet) + } + + connector.UseReceiveProxyProto(trustedIpNets) + } + + if config.ApiBinding != "" { + StartApiServer(config.ApiBinding) + } + + if config.InKubeCluster { + err = K8sWatcher.StartInCluster(ctx, config.AutoScale.Up, config.AutoScale.Down) + if err != nil { + return nil, fmt.Errorf("could not start in-cluster k8s integration: %w", err) + } + } else if config.KubeConfig != "" { + err := K8sWatcher.StartWithConfig(ctx, config.KubeConfig, config.AutoScale.Up, config.AutoScale.Down) + if err != nil { + return nil, fmt.Errorf("could not start k8s integration with kube config: %w", err) + } + } + + if config.InDocker { + err = DockerWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down) + if err != nil { + return nil, fmt.Errorf("could not start docker integration: %w", err) + } else { + defer DockerWatcher.Stop() + } + } + + if config.InDockerSwarm { + err = DockerSwarmWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down) + if err != nil { + return nil, fmt.Errorf("could not start docker swarm integration: %w", err) + } else { + defer DockerSwarmWatcher.Stop() + } + } + + Routes.SimplifySRV(config.SimplifySRV) + + err = metricsBuilder.Start(ctx) + if err != nil { + return nil, fmt.Errorf("could not start metrics reporter: %w", err) + } + + return &Server{ + ctx: ctx, + config: config, + connector: connector, + reloadConfigChan: make(chan struct{}), + doneChan: make(chan struct{}), + }, nil +} + +// Done provides a channel notified when the server has closed all connections, etc +func (s *Server) Done() <-chan struct{} { + return s.doneChan +} + +func (s *Server) notifyDone() { + s.doneChan <- struct{}{} +} + +// ReloadConfig indicates that an external request, such as a SIGHUP, +// is requesting the routes config file to be reloaded, if enabled +func (s *Server) ReloadConfig() { + s.reloadConfigChan <- struct{}{} +} + +// AcceptConnection provides a way to externally supply a connection to consume +// Note that this will skip rate limiting. +func (s *Server) AcceptConnection(conn net.Conn) { + s.connector.AcceptConnection(conn) +} + +// Run will run the server until the context is done or a fatal error occurs, so this should be +// in a go routine. +func (s *Server) Run() { + err := s.connector.StartAcceptingConnections( + net.JoinHostPort("", strconv.Itoa(s.config.Port)), + s.config.ConnectionRateLimit, + ) + if err != nil { + logrus.WithError(err).Error("Could not start accepting connections") + s.notifyDone() + return + } + + for { + select { + case <-s.reloadConfigChan: + if err := RoutesConfigLoader.Reload(); err != nil { + logrus.WithError(err). + Error("Could not re-read the routes config file") + } + + case <-s.ctx.Done(): + logrus.Info("Stopping. Waiting for connections to complete...") + s.connector.WaitForConnections() + logrus.Info("Stopped") + s.notifyDone() + return + } + } + +} diff --git a/skaffold.yaml b/skaffold.yaml index 68eeed0..39d3e78 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -1,13 +1,15 @@ -apiVersion: skaffold/v4beta13 +# nonk8s +apiVersion: skaffold/v4beta11 kind: Config metadata: - name: mc-router + name: mc-router-dev build: artifacts: - - image: itzg/mc-router + - image: itzg/mc-router-dev # https://skaffold.dev/docs/pipeline-stages/builders/ko/ ko: main: ./cmd/mc-router/ + manifests: rawYaml: - docs/k8s-deployment.yaml