diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 016a8a1..f2aa455 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -10,7 +10,7 @@ jobs: release: uses: itzg/github-workflows/.github/workflows/go-with-releaser-image.yml@main with: - go-version-file: 'go.mod' + go-version: "1.24.4" enable-ghcr: true secrets: image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3606806..ca25f32 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,4 +12,4 @@ jobs: build: uses: itzg/github-workflows/.github/workflows/go-test.yml@main with: - go-version-file: 'go.mod' + go-version: "1.24.4" diff --git a/README.md b/README.md index ef15ff2..c991943 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ The following shows a JSON file for routes config, where `default-server` can al } ``` -Sending a SIGHUP signal will cause mc-router to reload the routes config from disk. The file can also be watched for changes by setting `-routes-config-watch` or the env variable `ROUTES_CONFIG_WATCH` to "true". +Sending a SIGHUP signal will cause mc-router to reload the routes config from disk. ## Auto Scale Allow/Deny List diff --git a/cmd/mc-router/main.go b/cmd/mc-router/main.go index 10c6fb9..3624e62 100644 --- a/cmd/mc-router/main.go +++ b/cmd/mc-router/main.go @@ -3,14 +3,82 @@ package main import ( "context" "fmt" + "net" + "os" + "os/signal" + "runtime/pprof" + "strconv" + "syscall" + "time" + "github.com/itzg/go-flagsfiller" "github.com/itzg/mc-router/server" "github.com/sirupsen/logrus" - "os" - "os/signal" - "syscall" ) +type MetricsBackendConfig struct { + Influxdb struct { + Interval time.Duration `default:"1m"` + Tags map[string]string `usage:"any extra tags to be included with all reported metrics"` + Addr string + Username string + Password string + Database string + RetentionPolicy string + } +} + +type WebhookConfig struct { + Url string `usage:"If set, a POST request that contains connection status notifications will be sent to this HTTP address"` + RequireUser bool `default:"false" usage:"Indicates if the webhook will only be called if a user is connecting rather than just server list/ping"` +} + +type AutoScale struct { + Up bool `usage:"Increase Kubernetes StatefulSet Replicas (only) from 0 to 1 on respective backend servers when accessed"` + Down bool `default:"false" usage:"Decrease Kubernetes StatefulSet Replicas (only) from 1 to 0 on respective backend servers after there are no connections"` + DownAfter string `default:"10m" usage:"Server scale down delay after there are no connections"` + AllowDeny string `usage:"Path to config for server allowlists and denylists. If a global/server entry is specified, only players allowed to connect to the server will be able to trigger a scale up when -auto-scale-up is enabled or cancel active down scalers when -auto-scale-down is enabled"` +} + +type RoutesConfig struct { + Config string `usage:"Name or full [path] to routes config file"` + ConfigWatch bool `usage:"Watch for config file changes"` +} + +type Config struct { + Port int `default:"25565" usage:"The [port] bound to listen for Minecraft client connections"` + Default string `usage:"host:port of a default Minecraft server to use when mapping not found"` + Mapping map[string]string `usage:"Comma or newline delimited or repeated mappings of externalHostname=host:port"` + ApiBinding string `usage:"The [host:port] bound for servicing API requests"` + Version bool `usage:"Output version and exit"` + CpuProfile string `usage:"Enables CPU profiling and writes to given path"` + Debug bool `usage:"Enable debug logs"` + ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"` + InKubeCluster bool `usage:"Use in-cluster Kubernetes config"` + KubeConfig string `usage:"The path to a Kubernetes configuration file"` + InDocker bool `usage:"Use Docker service discovery"` + InDockerSwarm bool `usage:"Use Docker Swarm service discovery"` + DockerSocket string `default:"unix:///var/run/docker.sock" usage:"Path to Docker socket to use"` + DockerTimeout int `default:"0" usage:"Timeout configuration in seconds for the Docker integrations"` + DockerRefreshInterval int `default:"15" usage:"Refresh interval in seconds for the Docker integrations"` + MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb,prometheus"` + UseProxyProtocol bool `default:"false" usage:"Send PROXY protocol to backend servers"` + ReceiveProxyProtocol bool `default:"false" usage:"Receive PROXY protocol from backend servers, by default trusts every proxy header that it receives, combine with -trusted-proxies to specify a list of trusted proxies"` + TrustedProxies []string `usage:"Comma delimited list of CIDR notation IP blocks to trust when receiving PROXY protocol"` + RecordLogins bool `default:"false" usage:"Log and generate metrics on player logins. Metrics only supported with influxdb or prometheus backend"` + MetricsBackendConfig MetricsBackendConfig + Routes RoutesConfig + NgrokToken string `usage:"If set, an ngrok tunnel will be established. It is HIGHLY recommended to pass as an environment variable."` + AutoScale AutoScale + + ClientsToAllow []string `usage:"Zero or more client IP addresses or CIDRs to allow. Takes precedence over deny."` + ClientsToDeny []string `usage:"Zero or more client IP addresses or CIDRs to deny. Ignored if any configured to allow"` + + SimplifySRV bool `default:"false" usage:"Simplify fully qualified SRV records for mapping"` + + Webhook WebhookConfig `usage:"Webhook configuration"` +} + var ( version = "dev" commit = "none" @@ -21,60 +89,190 @@ func showVersion() { fmt.Printf("%v, commit %v, built at %v", version, commit, date) } -type CliConfig struct { - Version bool `usage:"Output version and exit"` - Debug bool `usage:"Enable debug logs"` - - ServerConfig server.Config `flatten:"true"` -} - func main() { - var cliConfig CliConfig - err := flagsfiller.Parse(&cliConfig, flagsfiller.WithEnv("")) + var config Config + err := flagsfiller.Parse(&config, flagsfiller.WithEnv("")) if err != nil { logrus.Fatal(err) } - if cliConfig.Version { + if config.Version { showVersion() os.Exit(0) } - if cliConfig.Debug { + if config.Debug { logrus.SetLevel(logrus.DebugLevel) logrus.Debug("Debug logs enabled") } + if config.CpuProfile != "" { + cpuProfileFile, err := os.Create(config.CpuProfile) + if err != nil { + logrus.WithError(err).Fatal("trying to create cpu profile file") + } + //goland:noinspection GoUnhandledErrorResult + defer cpuProfileFile.Close() + + logrus.WithField("file", config.CpuProfile).Info("Starting cpu profiling") + err = pprof.StartCPUProfile(cpuProfileFile) + if err != nil { + logrus.WithError(err).Fatal("trying to start cpu profile") + } + defer pprof.StopCPUProfile() + } + + var autoScaleAllowDenyConfig *server.AllowDenyConfig = nil + if config.AutoScale.AllowDeny != "" { + autoScaleAllowDenyConfig, err = server.ParseAllowDenyConfig(config.AutoScale.AllowDeny) + if err != nil { + logrus.WithError(err).Fatal("trying to parse autoscale up allow-deny-list file") + } + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - signals := make(chan os.Signal, 1) - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + metricsBuilder := NewMetricsBuilder(config.MetricsBackend, &config.MetricsBackendConfig) - s, err := server.NewServer(ctx, &cliConfig.ServerConfig) + downScalerEnabled := config.AutoScale.Down && (config.InKubeCluster || config.KubeConfig != "") + downScalerDelay, err := time.ParseDuration(config.AutoScale.DownAfter) if err != nil { - logrus.WithError(err).Fatal("Could not setup server") + logrus.WithError(err).Fatal("Unable to parse auto scale down after duration") } + // Only one instance should be created + server.DownScaler = server.NewDownScaler(ctx, downScalerEnabled, downScalerDelay) - go s.Run() + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - for { - select { - case <-s.Done(): - return + if config.Routes.Config != "" { + err := server.RoutesConfig.ReadRoutesConfig(config.Routes.Config) + if err != nil { + logrus.WithError(err).Fatal("Unable to load routes from config file") + } - case sig := <-signals: - switch sig { - case syscall.SIGHUP: - s.ReloadConfig() - - case syscall.SIGINT, syscall.SIGTERM: - cancel() - // but wait for the server to be done - - default: - logrus.WithField("signal", sig).Warn("Received unexpected signal") + if config.Routes.ConfigWatch { + err := server.RoutesConfig.WatchForChanges(ctx) + if err != nil { + logrus.WithError(err).Fatal("Unable to watch for changes") } } } + + server.Routes.RegisterAll(config.Mapping) + if config.Default != "" { + server.Routes.SetDefaultRoute(config.Default) + } + + if config.ConnectionRateLimit < 1 { + config.ConnectionRateLimit = 1 + } + + trustedIpNets := make([]*net.IPNet, 0) + for _, ip := range config.TrustedProxies { + _, ipNet, err := net.ParseCIDR(ip) + if err != nil { + logrus.WithError(err).Fatal("Unable to parse trusted proxy CIDR block") + } + trustedIpNets = append(trustedIpNets, ipNet) + } + + connector := server.NewConnector(metricsBuilder.BuildConnectorMetrics(), config.UseProxyProtocol, config.ReceiveProxyProtocol, trustedIpNets, config.RecordLogins, autoScaleAllowDenyConfig) + + clientFilter, err := server.NewClientFilter(config.ClientsToAllow, config.ClientsToDeny) + if err != nil { + logrus.WithError(err).Fatal("Unable to create client filter") + } + connector.SetClientFilter(clientFilter) + + if config.Webhook.Url != "" { + logrus. + WithField("url", config.Webhook.Url). + WithField("require-user", config.Webhook.RequireUser). + Info("Using webhook for connection status notifications") + connector.SetConnectionNotifier( + server.NewWebhookNotifier(config.Webhook.Url, config.Webhook.RequireUser)) + } + + if config.NgrokToken != "" { + connector.UseNgrok(config.NgrokToken) + } + err = connector.StartAcceptingConnections(ctx, + net.JoinHostPort("", strconv.Itoa(config.Port)), + config.ConnectionRateLimit, + ) + if err != nil { + logrus.Fatal(err) + } + + if config.ApiBinding != "" { + server.StartApiServer(config.ApiBinding) + } + + if config.InKubeCluster { + err = server.K8sWatcher.StartInCluster(config.AutoScale.Up, config.AutoScale.Down) + if err != nil { + logrus.WithError(err).Fatal("Unable to start k8s integration") + } else { + defer server.K8sWatcher.Stop() + } + } else if config.KubeConfig != "" { + err := server.K8sWatcher.StartWithConfig(config.KubeConfig, config.AutoScale.Up, config.AutoScale.Down) + if err != nil { + logrus.WithError(err).Fatal("Unable to start k8s integration") + } else { + defer server.K8sWatcher.Stop() + } + } + + if config.InDocker { + err = server.DockerWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down) + if err != nil { + logrus.WithError(err).Fatal("Unable to start docker integration") + } else { + defer server.DockerWatcher.Stop() + } + } + + if config.InDockerSwarm { + err = server.DockerSwarmWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down) + if err != nil { + logrus.WithError(err).Fatal("Unable to start docker swarm integration") + } else { + defer server.DockerSwarmWatcher.Stop() + } + } + + server.Routes.SimplifySRV(config.SimplifySRV) + + err = metricsBuilder.Start(ctx) + if err != nil { + logrus.WithError(err).Fatal("Unable to start metrics reporter") + } + + // handle signals + for { + sig := <-c + switch sig { + case syscall.SIGHUP: + if config.Routes.Config != "" { + logrus.Info("Received SIGHUP, reloading routes config...") + if err := server.RoutesConfig.ReloadRoutesConfig(); err != nil { + logrus. + WithError(err). + WithField("routesConfig", config.Routes.Config). + Error("Could not re-read the routes config file") + } + } + case syscall.SIGINT, syscall.SIGTERM: + logrus.WithField("signal", sig).Info("Stopping. Waiting for connections to complete...") + signal.Stop(c) + connector.WaitForConnections() + logrus.Info("Stopped") + return + default: + logrus.WithField("signal", sig).Warn("Received unexpected signal") + } + } } diff --git a/server/metrics.go b/cmd/mc-router/metrics.go similarity index 84% rename from server/metrics.go rename to cmd/mc-router/metrics.go index d704e49..dbb5936 100644 --- a/server/metrics.go +++ b/cmd/mc-router/metrics.go @@ -1,10 +1,9 @@ -package server +package main import ( "context" "errors" "fmt" - "github.com/go-kit/kit/metrics" "strings" "time" @@ -14,13 +13,14 @@ import ( kitinflux "github.com/go-kit/kit/metrics/influx" prometheusMetrics "github.com/go-kit/kit/metrics/prometheus" influx "github.com/influxdata/influxdb1-client/v2" + "github.com/itzg/mc-router/server" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" ) type MetricsBuilder interface { - BuildConnectorMetrics() *ConnectorMetrics + BuildConnectorMetrics() *server.ConnectorMetrics Start(ctx context.Context) error } @@ -31,18 +31,6 @@ const ( MetricsBackendDiscard = "discard" ) -type MetricsBackendConfig struct { - Influxdb struct { - Interval time.Duration `default:"1m"` - Tags map[string]string `usage:"any extra tags to be included with all reported metrics"` - Addr string - Username string - Password string - Database string - RetentionPolicy string - } -} - // NewMetricsBuilder creates a new MetricsBuilder based on the specified backend. // If the backend is not recognized, a discard builder is returned. // config can be nil if the backend is not influxdb. @@ -69,20 +57,9 @@ func (b expvarMetricsBuilder) Start(ctx context.Context) error { return nil } -type ConnectorMetrics struct { - Errors metrics.Counter - BytesTransmitted metrics.Counter - ConnectionsFrontend metrics.Counter - ConnectionsBackend metrics.Counter - ActiveConnections metrics.Gauge - ServerActivePlayer metrics.Gauge - ServerLogins metrics.Counter - ServerActiveConnections metrics.Gauge -} - -func (b expvarMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { +func (b expvarMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { c := expvarMetrics.NewCounter("connections") - return &ConnectorMetrics{ + return &server.ConnectorMetrics{ Errors: expvarMetrics.NewCounter("errors").With("subsystem", "connector"), BytesTransmitted: expvarMetrics.NewCounter("bytes"), ConnectionsFrontend: c, @@ -102,8 +79,8 @@ func (b discardMetricsBuilder) Start(ctx context.Context) error { return nil } -func (b discardMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { - return &ConnectorMetrics{ +func (b discardMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { + return &server.ConnectorMetrics{ Errors: discardMetrics.NewCounter(), BytesTransmitted: discardMetrics.NewCounter(), ConnectionsFrontend: discardMetrics.NewCounter(), @@ -144,7 +121,7 @@ func (b *influxMetricsBuilder) Start(ctx context.Context) error { return nil } -func (b *influxMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { +func (b *influxMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { influxConfig := &b.config.Influxdb metrics := kitinflux.New(influxConfig.Tags, influx.BatchPointsConfig{ @@ -155,7 +132,7 @@ func (b *influxMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { b.metrics = metrics c := metrics.NewCounter("mc_router_connections") - return &ConnectorMetrics{ + return &server.ConnectorMetrics{ Errors: metrics.NewCounter("mc_router_errors"), BytesTransmitted: metrics.NewCounter("mc_router_transmitted_bytes"), ConnectionsFrontend: c.With("side", "frontend"), @@ -178,13 +155,13 @@ func (b prometheusMetricsBuilder) Start(ctx context.Context) error { return nil } -func (b prometheusMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics { +func (b prometheusMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { pcv = prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "mc_router", Name: "errors", Help: "The total number of errors", }, []string{"type"})) - return &ConnectorMetrics{ + return &server.ConnectorMetrics{ Errors: pcv, BytesTransmitted: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "mc_router", diff --git a/go.mod b/go.mod index 2ee3be4..7c23f1b 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,15 @@ module github.com/itzg/mc-router -go 1.24.4 +go 1.24.0 + +toolchain go1.24.4 require ( github.com/fsnotify/fsnotify v1.9.0 github.com/go-kit/kit v0.13.0 github.com/gorilla/mux v1.8.1 github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c - github.com/itzg/go-flagsfiller v1.16.0 + github.com/itzg/go-flagsfiller v1.15.2 github.com/juju/ratelimit v1.0.2 github.com/pires/go-proxyproto v0.8.1 github.com/pkg/errors v0.9.1 @@ -80,7 +82,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/VividCortex/gohistogram v1.0.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/docker/docker v28.3.1+incompatible + github.com/docker/docker v28.3.0+incompatible github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/go-kit/log v0.2.1 // indirect @@ -98,7 +100,7 @@ require ( github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/spf13/pflag v1.0.6 // indirect - golang.org/x/net v0.41.0 // indirect + golang.org/x/net v0.40.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/term v0.32.0 // indirect diff --git a/go.sum b/go.sum index 8b58941..7e9031c 100644 --- a/go.sum +++ b/go.sum @@ -25,8 +25,6 @@ github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5Qvfr github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/docker/docker v28.3.0+incompatible h1:ffS62aKWupCWdvcee7nBU9fhnmknOqDPaJAMtfK0ImQ= github.com/docker/docker v28.3.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= -github.com/docker/docker v28.3.1+incompatible h1:20+BmuA9FXlCX4ByQ0vYJcUEnOmRM6XljDnFWR+jCyY= -github.com/docker/docker v28.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -91,8 +89,6 @@ github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSH github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/itzg/go-flagsfiller v1.15.2 h1:DvhhOKuqzawoa6C/3Q/y8pFbdO5PBdgnIvz84ZGkY0g= github.com/itzg/go-flagsfiller v1.15.2/go.mod h1:XmllPPi99O7vXTG9wa/Hzmhnkv6BXBF1W57ifbQTVs4= -github.com/itzg/go-flagsfiller v1.16.0 h1:YNwjLzFIeFzZpctT2RiN8T5qxiGrCX33bGSwtN6OSAA= -github.com/itzg/go-flagsfiller v1.16.0/go.mod h1:XmllPPi99O7vXTG9wa/Hzmhnkv6BXBF1W57ifbQTVs4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= @@ -209,7 +205,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= -golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -219,8 +214,6 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= -golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= -golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/server/api_server.go b/server/api_server.go index a0f55b1..afd8ef5 100644 --- a/server/api_server.go +++ b/server/api_server.go @@ -1,7 +1,6 @@ package server import ( - "encoding/json" "expvar" "net/http" @@ -10,12 +9,11 @@ import ( "github.com/sirupsen/logrus" ) +var apiRoutes = mux.NewRouter() + func StartApiServer(apiBinding string) { logrus.WithField("binding", apiBinding).Info("Serving API requests") - var apiRoutes = mux.NewRouter() - registerApiRoutes(apiRoutes) - apiRoutes.Path("/vars").Handler(expvar.Handler()) apiRoutes.Path("/metrics").Handler(promhttp.Handler()) @@ -25,84 +23,3 @@ func StartApiServer(apiBinding string) { http.ListenAndServe(apiBinding, apiRoutes)).Error("API server failed") }() } - -func registerApiRoutes(apiRoutes *mux.Router) { - apiRoutes.Path("/routes").Methods("GET"). - HandlerFunc(routesListHandler) - apiRoutes.Path("/routes").Methods("POST"). - HandlerFunc(routesCreateHandler) - apiRoutes.Path("/defaultRoute").Methods("POST"). - HandlerFunc(routesSetDefault) - apiRoutes.Path("/routes/{serverAddress}").Methods("DELETE").HandlerFunc(routesDeleteHandler) -} - -func routesListHandler(writer http.ResponseWriter, _ *http.Request) { - mappings := Routes.GetMappings() - bytes, err := json.Marshal(mappings) - if err != nil { - logrus.WithError(err).Error("Failed to marshal mappings") - writer.WriteHeader(http.StatusInternalServerError) - return - } - - writer.Header().Set("Content-Type", "application/json") - _, err = writer.Write(bytes) - if err != nil { - logrus.WithError(err).Error("Failed to write response") - } -} - -func routesDeleteHandler(writer http.ResponseWriter, request *http.Request) { - serverAddress := mux.Vars(request)["serverAddress"] - if serverAddress != "" { - if Routes.DeleteMapping(serverAddress) { - writer.WriteHeader(http.StatusOK) - } else { - writer.WriteHeader(http.StatusNotFound) - } - RoutesConfigLoader.SaveRoutes() - } -} - -func routesCreateHandler(writer http.ResponseWriter, request *http.Request) { - var definition = struct { - ServerAddress string - Backend string - }{} - - //goland:noinspection GoUnhandledErrorResult - defer request.Body.Close() - - decoder := json.NewDecoder(request.Body) - err := decoder.Decode(&definition) - if err != nil { - logrus.WithError(err).Error("Unable to get request body") - writer.WriteHeader(http.StatusBadRequest) - return - } - - Routes.CreateMapping(definition.ServerAddress, definition.Backend, EmptyScalerFunc, EmptyScalerFunc) - RoutesConfigLoader.SaveRoutes() - writer.WriteHeader(http.StatusCreated) -} - -func routesSetDefault(writer http.ResponseWriter, request *http.Request) { - var body = struct { - Backend string - }{} - - //goland:noinspection GoUnhandledErrorResult - defer request.Body.Close() - - decoder := json.NewDecoder(request.Body) - err := decoder.Decode(&body) - if err != nil { - logrus.WithError(err).Error("Unable to parse request") - writer.WriteHeader(http.StatusBadRequest) - return - } - - Routes.SetDefaultRoute(body.Backend) - RoutesConfigLoader.SaveRoutes() - writer.WriteHeader(http.StatusOK) -} diff --git a/server/client_filter.go b/server/client_filter.go index fdee7c0..33b7bf4 100644 --- a/server/client_filter.go +++ b/server/client_filter.go @@ -89,7 +89,7 @@ func NewClientFilter(allows []string, denies []string) (*ClientFilter, error) { }, nil } -// Allow determines if this filter allows the given address +// Allow determines if the given address is allowed by this filter // where addrStr is a netip.ParseAddr allowed address func (f *ClientFilter) Allow(addrPort netip.AddrPort) bool { if !f.allow.Empty() { diff --git a/server/configs.go b/server/configs.go deleted file mode 100644 index d34c467..0000000 --- a/server/configs.go +++ /dev/null @@ -1,50 +0,0 @@ -package server - -type WebhookConfig struct { - Url string `usage:"If set, a POST request that contains connection status notifications will be sent to this HTTP address"` - RequireUser bool `default:"false" usage:"Indicates if the webhook will only be called if a user is connecting rather than just server list/ping"` -} - -type AutoScale struct { - Up bool `usage:"Increase Kubernetes StatefulSet Replicas (only) from 0 to 1 on respective backend servers when accessed"` - Down bool `default:"false" usage:"Decrease Kubernetes StatefulSet Replicas (only) from 1 to 0 on respective backend servers after there are no connections"` - DownAfter string `default:"10m" usage:"Server scale down delay after there are no connections"` - AllowDeny string `usage:"Path to config for server allowlists and denylists. If a global/server entry is specified, only players allowed to connect to the server will be able to trigger a scale up when -auto-scale-up is enabled or cancel active down scalers when -auto-scale-down is enabled"` -} - -type RoutesConfig struct { - Config string `usage:"Name or full [path] to routes config file"` - ConfigWatch bool `usage:"Watch for config file changes"` -} - -type Config struct { - Port int `default:"25565" usage:"The [port] bound to listen for Minecraft client connections"` - Default string `usage:"host:port of a default Minecraft server to use when mapping not found"` - Mapping map[string]string `usage:"Comma or newline delimited or repeated mappings of externalHostname=host:port"` - ApiBinding string `usage:"The [host:port] bound for servicing API requests"` - CpuProfile string `usage:"Enables CPU profiling and writes to given path"` - ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"` - InKubeCluster bool `usage:"Use in-cluster Kubernetes config"` - KubeConfig string `usage:"The path to a Kubernetes configuration file"` - InDocker bool `usage:"Use Docker service discovery"` - InDockerSwarm bool `usage:"Use Docker Swarm service discovery"` - DockerSocket string `default:"unix:///var/run/docker.sock" usage:"Path to Docker socket to use"` - DockerTimeout int `default:"0" usage:"Timeout configuration in seconds for the Docker integrations"` - DockerRefreshInterval int `default:"15" usage:"Refresh interval in seconds for the Docker integrations"` - MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb,prometheus"` - MetricsBackendConfig MetricsBackendConfig - UseProxyProtocol bool `default:"false" usage:"Send PROXY protocol to backend servers"` - ReceiveProxyProtocol bool `default:"false" usage:"Receive PROXY protocol from backend servers, by default trusts every proxy header that it receives, combine with -trusted-proxies to specify a list of trusted proxies"` - TrustedProxies []string `usage:"Comma delimited list of CIDR notation IP blocks to trust when receiving PROXY protocol"` - RecordLogins bool `default:"false" usage:"Log and generate metrics on player logins. Metrics only supported with influxdb or prometheus backend"` - Routes RoutesConfig - NgrokToken string `usage:"If set, an ngrok tunnel will be established. It is HIGHLY recommended to pass as an environment variable."` - AutoScale AutoScale - - ClientsToAllow []string `usage:"Zero or more client IP addresses or CIDRs to allow. Takes precedence over deny."` - ClientsToDeny []string `usage:"Zero or more client IP addresses or CIDRs to deny. Ignored if any configured to allow"` - - SimplifySRV bool `default:"false" usage:"Simplify fully qualified SRV records for mapping"` - - Webhook WebhookConfig `usage:"Webhook configuration"` -} diff --git a/server/connector.go b/server/connector.go index 3f8e4bc..6af8d9f 100644 --- a/server/connector.go +++ b/server/connector.go @@ -12,9 +12,12 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" + "golang.ngrok.com/ngrok" "golang.ngrok.com/ngrok/config" + "github.com/go-kit/kit/metrics" "github.com/itzg/mc-router/mcproto" "github.com/juju/ratelimit" "github.com/pires/go-proxyproto" @@ -27,18 +30,58 @@ const ( var noDeadline time.Time -type ActiveConnections struct { +type ConnectorMetrics struct { + Errors metrics.Counter + BytesTransmitted metrics.Counter + ConnectionsFrontend metrics.Counter + ConnectionsBackend metrics.Counter + ActiveConnections metrics.Gauge + ServerActivePlayer metrics.Gauge + ServerLogins metrics.Counter + ServerActiveConnections metrics.Gauge +} + +type ClientInfo struct { + Host string `json:"host"` + Port int `json:"port"` +} + +func ClientInfoFromAddr(addr net.Addr) *ClientInfo { + if addr == nil { + return nil + } + + return &ClientInfo{ + Host: addr.(*net.TCPAddr).IP.String(), + Port: addr.(*net.TCPAddr).Port, + } +} + +type PlayerInfo struct { + Name string `json:"name"` + Uuid uuid.UUID `json:"uuid"` +} + +func (p *PlayerInfo) String() string { + if p == nil { + return "" + } + + return fmt.Sprintf("%s/%s", p.Name, p.Uuid) +} + +type ServerMetrics struct { sync.RWMutex activeConnections map[string]int } -func NewActiveConnections() *ActiveConnections { - return &ActiveConnections{ +func NewServerMetrics() *ServerMetrics { + return &ServerMetrics{ activeConnections: make(map[string]int), } } -func (sm *ActiveConnections) Increment(serverAddress string) { +func (sm *ServerMetrics) IncrementActiveConnections(serverAddress string) { sm.Lock() defer sm.Unlock() if _, ok := sm.activeConnections[serverAddress]; !ok { @@ -48,7 +91,7 @@ func (sm *ActiveConnections) Increment(serverAddress string) { sm.activeConnections[serverAddress] += 1 } -func (sm *ActiveConnections) Decrement(serverAddress string) { +func (sm *ServerMetrics) DecrementActiveConnections(serverAddress string) { sm.Lock() defer sm.Unlock() if activeConnections, ok := sm.activeConnections[serverAddress]; ok && activeConnections <= 0 { @@ -58,7 +101,7 @@ func (sm *ActiveConnections) Decrement(serverAddress string) { sm.activeConnections[serverAddress] -= 1 } -func (sm *ActiveConnections) GetCount(serverAddress string) int { +func (sm *ServerMetrics) ActiveConnectionsValue(serverAddress string) int { sm.Lock() defer sm.Unlock() if activeConnections, ok := sm.activeConnections[serverAddress]; ok { @@ -67,58 +110,60 @@ func (sm *ActiveConnections) GetCount(serverAddress string) int { return 0 } -func NewConnector(ctx context.Context, metrics *ConnectorMetrics, sendProxyProto bool, recordLogins bool, autoScaleUpAllowDenyConfig *AllowDenyConfig) *Connector { +func NewConnector(metrics *ConnectorMetrics, sendProxyProto bool, receiveProxyProto bool, trustedProxyNets []*net.IPNet, recordLogins bool, autoScaleUpAllowDenyConfig *AllowDenyConfig) *Connector { return &Connector{ - ctx: ctx, metrics: metrics, sendProxyProto: sendProxyProto, connectionsCond: sync.NewCond(&sync.Mutex{}), + receiveProxyProto: receiveProxyProto, + trustedProxyNets: trustedProxyNets, recordLogins: recordLogins, autoScaleUpAllowDenyConfig: autoScaleUpAllowDenyConfig, - activeConnections: NewActiveConnections(), + serverMetrics: NewServerMetrics(), } } type Connector struct { - ctx context.Context - state mcproto.State - metrics *ConnectorMetrics - sendProxyProto bool - receiveProxyProto bool - recordLogins bool - trustedProxyNets []*net.IPNet - totalActiveConnections int32 - activeConnections *ActiveConnections + state mcproto.State + metrics *ConnectorMetrics + sendProxyProto bool + receiveProxyProto bool + recordLogins bool + trustedProxyNets []*net.IPNet + + activeConnections int32 + serverMetrics *ServerMetrics connectionsCond *sync.Cond ngrokToken string clientFilter *ClientFilter autoScaleUpAllowDenyConfig *AllowDenyConfig - connectionNotifier ConnectionNotifier + + connectionNotifier ConnectionNotifier } -func (c *Connector) UseConnectionNotifier(notifier ConnectionNotifier) { +func (c *Connector) SetConnectionNotifier(notifier ConnectionNotifier) { c.connectionNotifier = notifier } -func (c *Connector) UseClientFilter(filter *ClientFilter) { +func (c *Connector) SetClientFilter(filter *ClientFilter) { c.clientFilter = filter } -func (c *Connector) StartAcceptingConnections(listenAddress string, connRateLimit int) error { - ln, err := c.createListener(listenAddress) +func (c *Connector) StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error { + ln, err := c.createListener(ctx, listenAddress) if err != nil { return err } - go c.acceptConnections(ln, connRateLimit) + go c.acceptConnections(ctx, ln, connRateLimit) return nil } -func (c *Connector) createListener(listenAddress string) (net.Listener, error) { +func (c *Connector) createListener(ctx context.Context, listenAddress string) (net.Listener, error) { if c.ngrokToken != "" { - ngrokTun, err := ngrok.Listen(c.ctx, + ngrokTun, err := ngrok.Listen(ctx, config.TCPEndpoint(), ngrok.WithAuthtoken(c.ngrokToken), ) @@ -139,8 +184,8 @@ func (c *Connector) createListener(listenAddress string) (net.Listener, error) { if c.receiveProxyProto { proxyListener := &proxyproto.Listener{ - Listener: listener, - ConnPolicy: c.createProxyProtoPolicy(), + Listener: listener, + Policy: c.createProxyProtoPolicy(), } logrus.Info("Using PROXY protocol listener") return proxyListener, nil @@ -149,8 +194,8 @@ func (c *Connector) createListener(listenAddress string) (net.Listener, error) { return listener, nil } -func (c *Connector) createProxyProtoPolicy() proxyproto.ConnPolicyFunc { - return func(connPolicyOptions proxyproto.ConnPolicyOptions) (proxyproto.Policy, error) { +func (c *Connector) createProxyProtoPolicy() func(upstream net.Addr) (proxyproto.Policy, error) { + return func(upstream net.Addr) (proxyproto.Policy, error) { trustedIpNets := c.trustedProxyNets if len(trustedIpNets) == 0 { @@ -158,7 +203,6 @@ func (c *Connector) createProxyProtoPolicy() proxyproto.ConnPolicyFunc { return proxyproto.USE, nil } - upstream := connPolicyOptions.Upstream upstreamIP := upstream.(*net.TCPAddr).IP for _, ipNet := range trustedIpNets { if ipNet.Contains(upstreamIP) { @@ -177,23 +221,17 @@ func (c *Connector) WaitForConnections() { defer c.connectionsCond.L.Unlock() for { - count := atomic.LoadInt32(&c.totalActiveConnections) + count := atomic.LoadInt32(&c.activeConnections) if count > 0 { logrus.Infof("Waiting on %d connection(s)", count) c.connectionsCond.Wait() } else { - return + break } } } -// AcceptConnection provides a way to externally supply a connection to consume. -// Note that this will skip rate limiting. -func (c *Connector) AcceptConnection(conn net.Conn) { - go c.HandleConnection(conn) -} - -func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) { +func (c *Connector) acceptConnections(ctx context.Context, ln net.Listener, connRateLimit int) { //noinspection GoUnhandledErrorResult defer ln.Close() @@ -201,7 +239,7 @@ func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) { for { select { - case <-c.ctx.Done(): + case <-ctx.Done(): return case <-time.After(bucket.Take(1)): @@ -209,13 +247,13 @@ func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) { if err != nil { logrus.WithError(err).Error("Failed to accept connection") } else { - go c.HandleConnection(conn) + go c.HandleConnection(ctx, conn) } } } } -func (c *Connector) HandleConnection(frontendConn net.Conn) { +func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn) { c.metrics.ConnectionsFrontend.Add(1) //noinspection GoUnhandledErrorResult defer frontendConn.Close() @@ -305,7 +343,7 @@ func (c *Connector) HandleConnection(frontendConn net.Conn) { Debug("Got user info") } - c.findAndConnectBackend(frontendConn, clientAddr, inspectionBuffer, handshake.ServerAddress, playerInfo, handshake.NextState) + c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, handshake.ServerAddress, playerInfo, handshake.NextState) } else if packet.PacketID == mcproto.PacketIdLegacyServerListPing { handshake, ok := packet.Data.(*mcproto.LegacyServerListPing) @@ -325,7 +363,7 @@ func (c *Connector) HandleConnection(frontendConn net.Conn) { serverAddress := handshake.ServerAddress - c.findAndConnectBackend(frontendConn, clientAddr, inspectionBuffer, serverAddress, nil, mcproto.StateStatus) + c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, serverAddress, nil, mcproto.StateStatus) } else { logrus. WithField("client", clientAddr). @@ -356,9 +394,9 @@ func (c *Connector) readPlayerInfo(protocolVersion mcproto.ProtocolVersion, buff } } -func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress string, playerInfo *PlayerInfo, backendHostPort string, cleanupMetrics bool, checkScaleDown bool) { +func (c *Connector) cleanupBackendConnection(ctx context.Context, clientAddr net.Addr, serverAddress string, playerInfo *PlayerInfo, backendHostPort string, cleanupMetrics bool, checkScaleDown bool) { if c.connectionNotifier != nil { - err := c.connectionNotifier.NotifyDisconnected(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort) + err := c.connectionNotifier.NotifyDisconnected(ctx, clientAddr, serverAddress, playerInfo, backendHostPort) if err != nil { logrus.WithError(err).Warn("failed to notify disconnected") } @@ -366,12 +404,12 @@ func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress if cleanupMetrics { c.metrics.ActiveConnections.Set(float64( - atomic.AddInt32(&c.totalActiveConnections, -1))) + atomic.AddInt32(&c.activeConnections, -1))) - c.activeConnections.Decrement(serverAddress) + c.serverMetrics.DecrementActiveConnections(serverAddress) c.metrics.ServerActiveConnections. With("server_address", serverAddress). - Set(float64(c.activeConnections.GetCount(serverAddress))) + Set(float64(c.serverMetrics.ActiveConnectionsValue(serverAddress))) if c.recordLogins && playerInfo != nil { c.metrics.ServerActivePlayer. @@ -381,21 +419,21 @@ func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress Set(0) } } - if checkScaleDown && c.activeConnections.GetCount(serverAddress) <= 0 { + if checkScaleDown && c.serverMetrics.ActiveConnectionsValue(serverAddress) <= 0 { DownScaler.Begin(serverAddress) } c.connectionsCond.Signal() } -func (c *Connector) findAndConnectBackend(frontendConn net.Conn, +func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net.Conn, clientAddr net.Addr, preReadContent io.Reader, serverAddress string, playerInfo *PlayerInfo, nextState mcproto.State) { - backendHostPort, resolvedHost, waker, _ := Routes.FindBackendForServerAddress(c.ctx, serverAddress) + backendHostPort, resolvedHost, waker, _ := Routes.FindBackendForServerAddress(ctx, serverAddress) cleanupMetrics := false cleanupCheckScaleDown := false defer func() { - c.cleanupBackendConnection(clientAddr, serverAddress, playerInfo, backendHostPort, cleanupMetrics, cleanupCheckScaleDown) + c.cleanupBackendConnection(ctx, clientAddr, serverAddress, playerInfo, backendHostPort, cleanupMetrics, cleanupCheckScaleDown) }() if waker != nil && nextState > mcproto.StateStatus { @@ -410,7 +448,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, // Cancel down scaler if active before scale up DownScaler.Cancel(serverAddress) cleanupCheckScaleDown = true - if err := waker(c.ctx); err != nil { + if err := waker(ctx); err != nil { logrus.WithFields(logrus.Fields{"serverAddress": serverAddress}).WithError(err).Error("failed to wake up backend") c.metrics.Errors.With("type", "wakeup_failed").Add(1) return @@ -427,7 +465,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, c.metrics.Errors.With("type", "missing_backend").Add(1) if c.connectionNotifier != nil { - err := c.connectionNotifier.NotifyMissingBackend(c.ctx, clientAddr, serverAddress, playerInfo) + err := c.connectionNotifier.NotifyMissingBackend(ctx, clientAddr, serverAddress, playerInfo) if err != nil { logrus.WithError(err).Warn("failed to notify missing backend") } @@ -455,7 +493,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, c.metrics.Errors.With("type", "backend_failed").Add(1) if c.connectionNotifier != nil { - notifyErr := c.connectionNotifier.NotifyFailedBackendConnection(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort, err) + notifyErr := c.connectionNotifier.NotifyFailedBackendConnection(ctx, clientAddr, serverAddress, playerInfo, backendHostPort, err) if notifyErr != nil { logrus.WithError(notifyErr).Warn("failed to notify failed backend connection") } @@ -465,7 +503,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, } if c.connectionNotifier != nil { - err := c.connectionNotifier.NotifyConnected(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort) + err := c.connectionNotifier.NotifyConnected(ctx, clientAddr, serverAddress, playerInfo, backendHostPort) if err != nil { logrus.WithError(err).Warn("failed to notify connected") } @@ -474,12 +512,12 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, c.metrics.ConnectionsBackend.With("host", resolvedHost).Add(1) c.metrics.ActiveConnections.Set(float64( - atomic.AddInt32(&c.totalActiveConnections, 1))) + atomic.AddInt32(&c.activeConnections, 1))) - c.activeConnections.Increment(serverAddress) + c.serverMetrics.IncrementActiveConnections(serverAddress) c.metrics.ServerActiveConnections. With("server_address", serverAddress). - Set(float64(c.activeConnections.GetCount(serverAddress))) + Set(float64(c.serverMetrics.ActiveConnectionsValue(serverAddress))) if c.recordLogins && playerInfo != nil { logrus. @@ -560,23 +598,23 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn, return } - c.pumpConnections(frontendConn, backendConn, playerInfo) + c.pumpConnections(ctx, frontendConn, backendConn, playerInfo) } -func (c *Connector) pumpConnections(frontendConn, backendConn net.Conn, playerInfo *PlayerInfo) { +func (c *Connector) pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn, playerInfo *PlayerInfo) { //noinspection GoUnhandledErrorResult defer backendConn.Close() clientAddr := frontendConn.RemoteAddr() defer logrus.WithField("client", clientAddr).Debug("Closing backend connection") - errorsChan := make(chan error, 2) + errors := make(chan error, 2) - go c.pumpFrames(backendConn, frontendConn, errorsChan, "backend", "frontend", clientAddr, playerInfo) - go c.pumpFrames(frontendConn, backendConn, errorsChan, "frontend", "backend", clientAddr, playerInfo) + go c.pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr, playerInfo) + go c.pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr, playerInfo) select { - case err := <-errorsChan: + case err := <-errors: if err != io.EOF { logrus.WithError(err). WithField("client", clientAddr). @@ -584,8 +622,8 @@ func (c *Connector) pumpConnections(frontendConn, backendConn net.Conn, playerIn c.metrics.Errors.With("type", "relay").Add(1) } - case <-c.ctx.Done(): - logrus.Debug("Connector observed context cancellation") + case <-ctx.Done(): + logrus.Debug("Observed context cancellation") } } @@ -611,8 +649,3 @@ func (c *Connector) pumpFrames(incoming io.Reader, outgoing io.Writer, errors ch func (c *Connector) UseNgrok(token string) { c.ngrokToken = token } - -func (c *Connector) UseReceiveProxyProto(trustedProxyNets []*net.IPNet) { - c.trustedProxyNets = trustedProxyNets - c.receiveProxyProto = true -} diff --git a/server/connector_test.go b/server/connector_test.go index ab26a53..357aa44 100644 --- a/server/connector_test.go +++ b/server/connector_test.go @@ -61,9 +61,7 @@ func TestTrustedProxyNetworkPolicy(t *testing.T) { policy := c.createProxyProtoPolicy() upstreamAddr := &net.TCPAddr{IP: net.ParseIP(test.upstreamIP)} - policyResult, _ := policy(proxyproto.ConnPolicyOptions{ - Upstream: upstreamAddr, - }) + policyResult, _ := policy(upstreamAddr) assert.Equal(t, test.expectedPolicy, policyResult, "Unexpected policy result for %s", test.name) }) } diff --git a/server/notifier.go b/server/notifier.go index 7d322a3..ad17c51 100644 --- a/server/notifier.go +++ b/server/notifier.go @@ -2,40 +2,9 @@ package server import ( "context" - "fmt" - "github.com/google/uuid" "net" ) -type PlayerInfo struct { - Name string `json:"name"` - Uuid uuid.UUID `json:"uuid"` -} - -func (p *PlayerInfo) String() string { - if p == nil { - return "" - } - - return fmt.Sprintf("%s/%s", p.Name, p.Uuid) -} - -type ClientInfo struct { - Host string `json:"host"` - Port int `json:"port"` -} - -func ClientInfoFromAddr(addr net.Addr) *ClientInfo { - if addr == nil { - return nil - } - - return &ClientInfo{ - Host: addr.(*net.TCPAddr).IP.String(), - Port: addr.(*net.TCPAddr).Port, - } -} - type ConnectionNotifier interface { // NotifyMissingBackend is called when an inbound connection is received for a server that does not have a backend. NotifyMissingBackend(ctx context.Context, clientAddr net.Addr, server string, playerInfo *PlayerInfo) error diff --git a/server/routes.go b/server/routes.go index 7c2f6d7..f69425b 100644 --- a/server/routes.go +++ b/server/routes.go @@ -2,10 +2,13 @@ package server import ( "context" + "encoding/json" + "net/http" "regexp" "strings" "sync" + "github.com/gorilla/mux" "github.com/sirupsen/logrus" ) @@ -15,6 +18,88 @@ var EmptyScalerFunc = func(ctx context.Context) error { return nil } var tcpShieldPattern = regexp.MustCompile("///.*") +func init() { + apiRoutes.Path("/routes").Methods("GET"). + Headers("Accept", "application/json"). + HandlerFunc(routesListHandler) + apiRoutes.Path("/routes").Methods("POST"). + Headers("Content-Type", "application/json"). + HandlerFunc(routesCreateHandler) + apiRoutes.Path("/defaultRoute").Methods("POST"). + Headers("Content-Type", "application/json"). + HandlerFunc(routesSetDefault) + apiRoutes.Path("/routes/{serverAddress}").Methods("DELETE").HandlerFunc(routesDeleteHandler) +} + +func routesListHandler(writer http.ResponseWriter, _ *http.Request) { + mappings := Routes.GetMappings() + bytes, err := json.Marshal(mappings) + if err != nil { + logrus.WithError(err).Error("Failed to marshal mappings") + writer.WriteHeader(http.StatusInternalServerError) + return + } + _, err = writer.Write(bytes) + if err != nil { + logrus.WithError(err).Error("Failed to write response") + } +} + +func routesDeleteHandler(writer http.ResponseWriter, request *http.Request) { + serverAddress := mux.Vars(request)["serverAddress"] + RoutesConfig.DeleteMapping(serverAddress) + if serverAddress != "" { + if Routes.DeleteMapping(serverAddress) { + writer.WriteHeader(http.StatusOK) + } else { + writer.WriteHeader(http.StatusNotFound) + } + } +} + +func routesCreateHandler(writer http.ResponseWriter, request *http.Request) { + var definition = struct { + ServerAddress string + Backend string + }{} + + //goland:noinspection GoUnhandledErrorResult + defer request.Body.Close() + + decoder := json.NewDecoder(request.Body) + err := decoder.Decode(&definition) + if err != nil { + logrus.WithError(err).Error("Unable to get request body") + writer.WriteHeader(http.StatusBadRequest) + return + } + + Routes.CreateMapping(definition.ServerAddress, definition.Backend, EmptyScalerFunc, EmptyScalerFunc) + RoutesConfig.AddMapping(definition.ServerAddress, definition.Backend) + writer.WriteHeader(http.StatusCreated) +} + +func routesSetDefault(writer http.ResponseWriter, request *http.Request) { + var body = struct { + Backend string + }{} + + //goland:noinspection GoUnhandledErrorResult + defer request.Body.Close() + + decoder := json.NewDecoder(request.Body) + err := decoder.Decode(&body) + if err != nil { + logrus.WithError(err).Error("Unable to parse request") + writer.WriteHeader(http.StatusBadRequest) + return + } + + Routes.SetDefaultRoute(body.Backend) + RoutesConfig.SetDefaultRoute(body.Backend) + writer.WriteHeader(http.StatusOK) +} + type IRoutes interface { Reset() RegisterAll(mappings map[string]string) @@ -27,7 +112,6 @@ type IRoutes interface { DeleteMapping(serverAddress string) bool CreateMapping(serverAddress string, backend string, waker ScalerFunc, sleeper ScalerFunc) SetDefaultRoute(backend string) - GetDefaultRoute() string SimplifySRV(srvEnabled bool) } @@ -73,10 +157,6 @@ func (r *routesImpl) SetDefaultRoute(backend string) { }).Info("Using default route") } -func (r *routesImpl) GetDefaultRoute() string { - return r.defaultRoute -} - func (r *routesImpl) SimplifySRV(srvEnabled bool) { r.simplifySRV = srvEnabled } diff --git a/server/routes_config.go b/server/routes_config.go new file mode 100644 index 0000000..ac074a5 --- /dev/null +++ b/server/routes_config.go @@ -0,0 +1,260 @@ +package server + +import ( + "context" + "encoding/json" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "io/fs" + "os" + "sync" +) + +type IRoutesConfig interface { + ReadRoutesConfig(routesConfig string) + ReloadRoutesConfig() + AddMapping(serverAddress string, backend string) + DeleteMapping(serverAddress string) + SetDefaultRoute(backend string) + WatchForChanges(ctx context.Context) error +} + +const debounceConfigRereadDuration = time.Second * 5 + +var RoutesConfig = &routesConfigImpl{} + +type routesConfigImpl struct { + sync.RWMutex + fileName string +} + +type routesConfigStructure struct { + DefaultServer string `json:"default-server"` + Mappings map[string]string `json:"mappings"` +} + +func (r *routesConfigImpl) ReadRoutesConfig(routesConfig string) error { + r.fileName = routesConfig + + logrus.WithField("routesConfig", r.fileName).Info("Loading routes config file") + + config, readErr := r.readRoutesConfigFile() + + if readErr != nil { + if errors.Is(readErr, fs.ErrNotExist) { + logrus.WithField("routesConfig", r.fileName).Info("Routes config file doses not exist, skipping reading it") + // File doesn't exist -> ignore it + return nil + } + return errors.Wrap(readErr, "Could not load the routes config file") + } + + Routes.RegisterAll(config.Mappings) + Routes.SetDefaultRoute(config.DefaultServer) + return nil +} + +func (r *routesConfigImpl) ReloadRoutesConfig() error { + config, readErr := r.readRoutesConfigFile() + + if readErr != nil { + return readErr + } + + logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file") + Routes.Reset() + Routes.RegisterAll(config.Mappings) + Routes.SetDefaultRoute(config.DefaultServer) + + return nil +} + +func (r *routesConfigImpl) WatchForChanges(ctx context.Context) error { + if r.fileName == "" { + return errors.New("routes config file needs to be specified first") + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return errors.Wrap(err, "Could not create a watcher") + } + + err = watcher.Add(r.fileName) + if err != nil { + return errors.Wrap(err, "Could not watch the routes config file") + } + + go func() { + logrus.WithField("file", r.fileName).Info("Watching routes config file") + + debounceTimerChan := make(<-chan time.Time) + var debounceTimer *time.Timer + + //goland:noinspection GoUnhandledErrorResult + defer watcher.Close() + for { + select { + + case event, ok := <-watcher.Events: + if !ok { + logrus.Debug("Watcher events channel closed") + return + } + logrus. + WithField("file", event.Name). + WithField("op", event.Op). + Trace("fs event received") + if event.Op.Has(fsnotify.Write) || event.Op.Has(fsnotify.Create) { + if debounceTimer == nil { + debounceTimer = time.NewTimer(debounceConfigRereadDuration) + } else { + debounceTimer.Reset(debounceConfigRereadDuration) + } + debounceTimerChan = debounceTimer.C + logrus.WithField("delay", debounceConfigRereadDuration).Debug("Will re-read config file after delay") + } + + case <-debounceTimerChan: + readErr := r.ReadRoutesConfig(r.fileName) + if readErr != nil { + logrus. + WithError(readErr). + WithField("routesConfig", r.fileName). + Error("Could not re-read the routes config file") + } + + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +func (r *routesConfigImpl) AddMapping(serverAddress string, backend string) { + if !r.isRoutesConfigEnabled() { + return + } + + config, readErr := r.readRoutesConfigFile() + if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) { + logrus.WithError(readErr).Error("Could not read the routes config file") + return + } + if config.Mappings == nil { + config.Mappings = make(map[string]string) + } + + config.Mappings[serverAddress] = backend + + writeErr := r.writeRoutesConfigFile(config) + if writeErr != nil { + logrus.WithError(writeErr).Error("Could not write to the routes config file") + return + } + + logrus.WithFields(logrus.Fields{ + "serverAddress": serverAddress, + "backend": backend, + }).Info("Added route to routes config") + + return +} + +func (r *routesConfigImpl) SetDefaultRoute(backend string) { + if !r.isRoutesConfigEnabled() { + return + } + + config, readErr := r.readRoutesConfigFile() + if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) { + logrus.WithError(readErr).Error("Could not read the routes config file") + return + } + + config.DefaultServer = backend + + writeErr := r.writeRoutesConfigFile(config) + if writeErr != nil { + logrus.WithError(writeErr).Error("Could not write to the routes config file") + return + } + + logrus.WithFields(logrus.Fields{ + "backend": backend, + }).Info("Set default route in routes config") + + return +} + +func (r *routesConfigImpl) DeleteMapping(serverAddress string) { + if !r.isRoutesConfigEnabled() { + return + } + + config, readErr := r.readRoutesConfigFile() + if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) { + logrus.WithError(readErr).Error("Could not read the routes config file") + return + } + + delete(config.Mappings, serverAddress) + + writeErr := r.writeRoutesConfigFile(config) + if writeErr != nil { + logrus.WithError(writeErr).Error("Could not write to the routes config file") + return + } + + logrus.WithField("serverAddress", serverAddress).Info("Deleted route in routes config") + + return +} + +func (r *routesConfigImpl) isRoutesConfigEnabled() bool { + return r.fileName != "" +} + +func (r *routesConfigImpl) readRoutesConfigFile() (routesConfigStructure, error) { + r.RLock() + defer r.RUnlock() + + config := routesConfigStructure{ + "", + make(map[string]string), + } + + file, fileErr := os.ReadFile(r.fileName) + if fileErr != nil { + return config, errors.Wrap(fileErr, "Could not load the routes config file") + } + + parseErr := json.Unmarshal(file, &config) + if parseErr != nil { + return config, errors.Wrap(parseErr, "Could not parse the json routes config file") + } + + return config, nil +} + +func (r *routesConfigImpl) writeRoutesConfigFile(config routesConfigStructure) error { + r.Lock() + defer r.Unlock() + + newFileContent, parseErr := json.Marshal(config) + if parseErr != nil { + return errors.Wrap(parseErr, "Could not parse the routes to json") + } + + fileErr := os.WriteFile(r.fileName, newFileContent, 0664) + if fileErr != nil { + return errors.Wrap(fileErr, "Could not write to the routes config file") + } + + return nil +} diff --git a/server/routes_config_loader.go b/server/routes_config_loader.go deleted file mode 100644 index 43a9aeb..0000000 --- a/server/routes_config_loader.go +++ /dev/null @@ -1,181 +0,0 @@ -package server - -import ( - "context" - "encoding/json" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - - "io/fs" - "os" -) - -const debounceConfigRereadDuration = time.Second * 5 - -var RoutesConfigLoader = &routesConfigLoader{} - -type routesConfigLoader struct { - fileName string -} - -// RoutesConfigSchema declares the schema of the json file that can provide routes to serve -type RoutesConfigSchema struct { - DefaultServer string `json:"default-server"` - Mappings map[string]string `json:"mappings"` -} - -func (r *routesConfigLoader) Load(routesConfigFileName string) error { - r.fileName = routesConfigFileName - - logrus.WithField("routesConfigFileName", r.fileName).Info("Loading routes config file") - - config, readErr := r.readFile() - - if readErr != nil { - if errors.Is(readErr, fs.ErrNotExist) { - logrus.WithField("routesConfigFileName", r.fileName).Info("Routes config file doses not exist, skipping reading it") - // File doesn't exist -> ignore it - return nil - } - return errors.Wrap(readErr, "Could not load the routes config file") - } - - Routes.RegisterAll(config.Mappings) - Routes.SetDefaultRoute(config.DefaultServer) - return nil -} - -func (r *routesConfigLoader) Reload() error { - if !r.isEnabled() { - return nil - } - - config, readErr := r.readFile() - - if readErr != nil { - return readErr - } - - logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file") - Routes.Reset() - Routes.RegisterAll(config.Mappings) - Routes.SetDefaultRoute(config.DefaultServer) - - return nil -} - -func (r *routesConfigLoader) WatchForChanges(ctx context.Context) error { - if r.fileName == "" { - return errors.New("routes config file needs to be specified first") - } - - watcher, err := fsnotify.NewWatcher() - if err != nil { - return errors.Wrap(err, "Could not create a watcher") - } - - err = watcher.Add(r.fileName) - if err != nil { - return errors.Wrap(err, "Could not watch the routes config file") - } - - go func() { - logrus.WithField("file", r.fileName).Info("Watching routes config file") - - debounceTimerChan := make(<-chan time.Time) - var debounceTimer *time.Timer - - //goland:noinspection GoUnhandledErrorResult - defer watcher.Close() - for { - select { - - case event, ok := <-watcher.Events: - if !ok { - logrus.Debug("Watcher events channel closed") - return - } - logrus. - WithField("file", event.Name). - WithField("op", event.Op). - Trace("fs event received") - if event.Op.Has(fsnotify.Write) || event.Op.Has(fsnotify.Create) { - if debounceTimer == nil { - debounceTimer = time.NewTimer(debounceConfigRereadDuration) - } else { - debounceTimer.Reset(debounceConfigRereadDuration) - } - debounceTimerChan = debounceTimer.C - logrus.WithField("delay", debounceConfigRereadDuration).Debug("Will re-read config file after delay") - } - - case <-debounceTimerChan: - readErr := r.Load(r.fileName) - if readErr != nil { - logrus. - WithError(readErr). - WithField("routesConfig", r.fileName). - Error("Could not re-read the routes config file") - } - - case <-ctx.Done(): - return - } - } - }() - - return nil -} - -func (r *routesConfigLoader) SaveRoutes() { - if !r.isEnabled() { - return - } - - err := r.writeFile(&RoutesConfigSchema{ - DefaultServer: Routes.GetDefaultRoute(), - Mappings: Routes.GetMappings(), - }) - if err != nil { - logrus.WithError(err).Error("Could not save the routes config file") - return - } - logrus.Info("Saved routes config") -} - -func (r *routesConfigLoader) isEnabled() bool { - return r.fileName != "" -} - -func (r *routesConfigLoader) readFile() (*RoutesConfigSchema, error) { - var config RoutesConfigSchema - - content, err := os.ReadFile(r.fileName) - if err != nil { - return &config, errors.Wrap(err, "Could not load the routes config file") - } - - parseErr := json.Unmarshal(content, &config) - if parseErr != nil { - return &config, errors.Wrap(parseErr, "Could not parse the json routes config file") - } - - return &config, nil -} - -func (r *routesConfigLoader) writeFile(config *RoutesConfigSchema) error { - newFileContent, err := json.Marshal(config) - if err != nil { - return errors.Wrap(err, "Could not parse the routes to json") - } - - err = os.WriteFile(r.fileName, newFileContent, 0664) - if err != nil { - return errors.Wrap(err, "Could not write to the routes config file") - } - - return nil -} diff --git a/server/server.go b/server/server.go deleted file mode 100644 index 1ca7652..0000000 --- a/server/server.go +++ /dev/null @@ -1,224 +0,0 @@ -package server - -import ( - "context" - "fmt" - "github.com/sirupsen/logrus" - "net" - "os" - "runtime/pprof" - "strconv" - "time" -) - -type Server struct { - ctx context.Context - config *Config - connector *Connector - reloadConfigChan chan struct{} - doneChan chan struct{} -} - -func NewServer(ctx context.Context, config *Config) (*Server, error) { - if config.CpuProfile != "" { - cpuProfileFile, err := os.Create(config.CpuProfile) - if err != nil { - return nil, fmt.Errorf("could not create cpu profile file: %w", err) - } - //goland:noinspection GoUnhandledErrorResult - defer cpuProfileFile.Close() - - logrus.WithField("file", config.CpuProfile).Info("Starting cpu profiling") - err = pprof.StartCPUProfile(cpuProfileFile) - if err != nil { - return nil, fmt.Errorf("could not start cpu profile: %w", err) - } - defer pprof.StopCPUProfile() - } - - var err error - - var autoScaleAllowDenyConfig *AllowDenyConfig = nil - if config.AutoScale.AllowDeny != "" { - autoScaleAllowDenyConfig, err = ParseAllowDenyConfig(config.AutoScale.AllowDeny) - if err != nil { - return nil, fmt.Errorf("could not parse autoscale allow-deny-list: %w", err) - } - } - - metricsBuilder := NewMetricsBuilder(config.MetricsBackend, &config.MetricsBackendConfig) - - downScalerEnabled := config.AutoScale.Down && (config.InKubeCluster || config.KubeConfig != "") - downScalerDelay, err := time.ParseDuration(config.AutoScale.DownAfter) - if err != nil { - return nil, fmt.Errorf("could not parse auto-scale-down-after duration: %w", err) - } - // Only one instance should be created - DownScaler = NewDownScaler(ctx, downScalerEnabled, downScalerDelay) - - if config.Routes.Config != "" { - err := RoutesConfigLoader.Load(config.Routes.Config) - if err != nil { - return nil, fmt.Errorf("could not load routes config file: %w", err) - } - - if config.Routes.ConfigWatch { - err := RoutesConfigLoader.WatchForChanges(ctx) - if err != nil { - return nil, fmt.Errorf("could not watch for changes to routes config file: %w", err) - } - } - } - - Routes.RegisterAll(config.Mapping) - if config.Default != "" { - Routes.SetDefaultRoute(config.Default) - } - - if config.ConnectionRateLimit < 1 { - config.ConnectionRateLimit = 1 - } - - connector := NewConnector(ctx, - metricsBuilder.BuildConnectorMetrics(), - config.UseProxyProtocol, - config.RecordLogins, - autoScaleAllowDenyConfig) - - clientFilter, err := NewClientFilter(config.ClientsToAllow, config.ClientsToDeny) - if err != nil { - return nil, fmt.Errorf("could not create client filter: %w", err) - } - connector.UseClientFilter(clientFilter) - - if config.Webhook.Url != "" { - logrus.WithField("url", config.Webhook.Url). - WithField("require-user", config.Webhook.RequireUser). - Info("Using webhook for connection status notifications") - connector.UseConnectionNotifier( - NewWebhookNotifier(config.Webhook.Url, config.Webhook.RequireUser)) - } - - if config.NgrokToken != "" { - connector.UseNgrok(config.NgrokToken) - } - - if config.ReceiveProxyProtocol { - trustedIpNets := make([]*net.IPNet, 0) - for _, ip := range config.TrustedProxies { - _, ipNet, err := net.ParseCIDR(ip) - if err != nil { - return nil, fmt.Errorf("could not parse trusted proxy CIDR block: %w", err) - } - trustedIpNets = append(trustedIpNets, ipNet) - } - - connector.UseReceiveProxyProto(trustedIpNets) - } - - if config.ApiBinding != "" { - StartApiServer(config.ApiBinding) - } - - if config.InKubeCluster { - err = K8sWatcher.StartInCluster(config.AutoScale.Up, config.AutoScale.Down) - if err != nil { - return nil, fmt.Errorf("could not start in-cluster k8s integration: %w", err) - } else { - defer K8sWatcher.Stop() - } - } else if config.KubeConfig != "" { - err := K8sWatcher.StartWithConfig(config.KubeConfig, config.AutoScale.Up, config.AutoScale.Down) - if err != nil { - return nil, fmt.Errorf("could not start k8s integration with kube config: %w", err) - } else { - defer K8sWatcher.Stop() - } - } - - if config.InDocker { - err = DockerWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down) - if err != nil { - return nil, fmt.Errorf("could not start docker integration: %w", err) - } else { - defer DockerWatcher.Stop() - } - } - - if config.InDockerSwarm { - err = DockerSwarmWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down) - if err != nil { - return nil, fmt.Errorf("could not start docker swarm integration: %w", err) - } else { - defer DockerSwarmWatcher.Stop() - } - } - - Routes.SimplifySRV(config.SimplifySRV) - - err = metricsBuilder.Start(ctx) - if err != nil { - return nil, fmt.Errorf("could not start metrics reporter: %w", err) - } - - return &Server{ - ctx: ctx, - config: config, - connector: connector, - reloadConfigChan: make(chan struct{}), - doneChan: make(chan struct{}), - }, nil -} - -// Done provides a channel notified when the server has closed all connections, etc -func (s *Server) Done() <-chan struct{} { - return s.doneChan -} - -func (s *Server) notifyDone() { - s.doneChan <- struct{}{} -} - -// ReloadConfig indicates that an external request, such as a SIGHUP, -// is requesting the routes config file to be reloaded, if enabled -func (s *Server) ReloadConfig() { - s.reloadConfigChan <- struct{}{} -} - -// AcceptConnection provides a way to externally supply a connection to consume -// Note that this will skip rate limiting. -func (s *Server) AcceptConnection(conn net.Conn) { - s.connector.AcceptConnection(conn) -} - -// Run will run the server until the context is done or a fatal error occurs, so this should be -// in a go routine. -func (s *Server) Run() { - err := s.connector.StartAcceptingConnections( - net.JoinHostPort("", strconv.Itoa(s.config.Port)), - s.config.ConnectionRateLimit, - ) - if err != nil { - logrus.WithError(err).Error("Could not start accepting connections") - s.notifyDone() - return - } - - for { - select { - case <-s.reloadConfigChan: - if err := RoutesConfigLoader.Reload(); err != nil { - logrus.WithError(err). - Error("Could not re-read the routes config file") - } - - case <-s.ctx.Done(): - logrus.Info("Stopping. Waiting for connections to complete...") - s.connector.WaitForConnections() - logrus.Info("Stopped") - s.notifyDone() - return - } - } - -}