Revert code cleanup for now (#428)

* Revert "Code cleanup in and around connector (#427)"

This reverts commit b3e88db48c.

* Revert "Refactored server setup and run out of main (#425)"

This reverts commit 05c57c3b85.

* Revert "Code cleanup of routes config loader and API server (#424)"

This reverts commit 1ee3eb4de3.
This commit is contained in:
Geoff Bourne
2025-07-06 20:16:26 -05:00
committed by GitHub
parent b3e88db48c
commit 172aed3893
17 changed files with 708 additions and 736 deletions
+2 -85
View File
@@ -1,7 +1,6 @@
package server
import (
"encoding/json"
"expvar"
"net/http"
@@ -10,12 +9,11 @@ import (
"github.com/sirupsen/logrus"
)
var apiRoutes = mux.NewRouter()
func StartApiServer(apiBinding string) {
logrus.WithField("binding", apiBinding).Info("Serving API requests")
var apiRoutes = mux.NewRouter()
registerApiRoutes(apiRoutes)
apiRoutes.Path("/vars").Handler(expvar.Handler())
apiRoutes.Path("/metrics").Handler(promhttp.Handler())
@@ -25,84 +23,3 @@ func StartApiServer(apiBinding string) {
http.ListenAndServe(apiBinding, apiRoutes)).Error("API server failed")
}()
}
func registerApiRoutes(apiRoutes *mux.Router) {
apiRoutes.Path("/routes").Methods("GET").
HandlerFunc(routesListHandler)
apiRoutes.Path("/routes").Methods("POST").
HandlerFunc(routesCreateHandler)
apiRoutes.Path("/defaultRoute").Methods("POST").
HandlerFunc(routesSetDefault)
apiRoutes.Path("/routes/{serverAddress}").Methods("DELETE").HandlerFunc(routesDeleteHandler)
}
func routesListHandler(writer http.ResponseWriter, _ *http.Request) {
mappings := Routes.GetMappings()
bytes, err := json.Marshal(mappings)
if err != nil {
logrus.WithError(err).Error("Failed to marshal mappings")
writer.WriteHeader(http.StatusInternalServerError)
return
}
writer.Header().Set("Content-Type", "application/json")
_, err = writer.Write(bytes)
if err != nil {
logrus.WithError(err).Error("Failed to write response")
}
}
func routesDeleteHandler(writer http.ResponseWriter, request *http.Request) {
serverAddress := mux.Vars(request)["serverAddress"]
if serverAddress != "" {
if Routes.DeleteMapping(serverAddress) {
writer.WriteHeader(http.StatusOK)
} else {
writer.WriteHeader(http.StatusNotFound)
}
RoutesConfigLoader.SaveRoutes()
}
}
func routesCreateHandler(writer http.ResponseWriter, request *http.Request) {
var definition = struct {
ServerAddress string
Backend string
}{}
//goland:noinspection GoUnhandledErrorResult
defer request.Body.Close()
decoder := json.NewDecoder(request.Body)
err := decoder.Decode(&definition)
if err != nil {
logrus.WithError(err).Error("Unable to get request body")
writer.WriteHeader(http.StatusBadRequest)
return
}
Routes.CreateMapping(definition.ServerAddress, definition.Backend, EmptyScalerFunc, EmptyScalerFunc)
RoutesConfigLoader.SaveRoutes()
writer.WriteHeader(http.StatusCreated)
}
func routesSetDefault(writer http.ResponseWriter, request *http.Request) {
var body = struct {
Backend string
}{}
//goland:noinspection GoUnhandledErrorResult
defer request.Body.Close()
decoder := json.NewDecoder(request.Body)
err := decoder.Decode(&body)
if err != nil {
logrus.WithError(err).Error("Unable to parse request")
writer.WriteHeader(http.StatusBadRequest)
return
}
Routes.SetDefaultRoute(body.Backend)
RoutesConfigLoader.SaveRoutes()
writer.WriteHeader(http.StatusOK)
}
+1 -1
View File
@@ -89,7 +89,7 @@ func NewClientFilter(allows []string, denies []string) (*ClientFilter, error) {
}, nil
}
// Allow determines if this filter allows the given address
// Allow determines if the given address is allowed by this filter
// where addrStr is a netip.ParseAddr allowed address
func (f *ClientFilter) Allow(addrPort netip.AddrPort) bool {
if !f.allow.Empty() {
-50
View File
@@ -1,50 +0,0 @@
package server
type WebhookConfig struct {
Url string `usage:"If set, a POST request that contains connection status notifications will be sent to this HTTP address"`
RequireUser bool `default:"false" usage:"Indicates if the webhook will only be called if a user is connecting rather than just server list/ping"`
}
type AutoScale struct {
Up bool `usage:"Increase Kubernetes StatefulSet Replicas (only) from 0 to 1 on respective backend servers when accessed"`
Down bool `default:"false" usage:"Decrease Kubernetes StatefulSet Replicas (only) from 1 to 0 on respective backend servers after there are no connections"`
DownAfter string `default:"10m" usage:"Server scale down delay after there are no connections"`
AllowDeny string `usage:"Path to config for server allowlists and denylists. If a global/server entry is specified, only players allowed to connect to the server will be able to trigger a scale up when -auto-scale-up is enabled or cancel active down scalers when -auto-scale-down is enabled"`
}
type RoutesConfig struct {
Config string `usage:"Name or full [path] to routes config file"`
ConfigWatch bool `usage:"Watch for config file changes"`
}
type Config struct {
Port int `default:"25565" usage:"The [port] bound to listen for Minecraft client connections"`
Default string `usage:"host:port of a default Minecraft server to use when mapping not found"`
Mapping map[string]string `usage:"Comma or newline delimited or repeated mappings of externalHostname=host:port"`
ApiBinding string `usage:"The [host:port] bound for servicing API requests"`
CpuProfile string `usage:"Enables CPU profiling and writes to given path"`
ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"`
InKubeCluster bool `usage:"Use in-cluster Kubernetes config"`
KubeConfig string `usage:"The path to a Kubernetes configuration file"`
InDocker bool `usage:"Use Docker service discovery"`
InDockerSwarm bool `usage:"Use Docker Swarm service discovery"`
DockerSocket string `default:"unix:///var/run/docker.sock" usage:"Path to Docker socket to use"`
DockerTimeout int `default:"0" usage:"Timeout configuration in seconds for the Docker integrations"`
DockerRefreshInterval int `default:"15" usage:"Refresh interval in seconds for the Docker integrations"`
MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb,prometheus"`
MetricsBackendConfig MetricsBackendConfig
UseProxyProtocol bool `default:"false" usage:"Send PROXY protocol to backend servers"`
ReceiveProxyProtocol bool `default:"false" usage:"Receive PROXY protocol from backend servers, by default trusts every proxy header that it receives, combine with -trusted-proxies to specify a list of trusted proxies"`
TrustedProxies []string `usage:"Comma delimited list of CIDR notation IP blocks to trust when receiving PROXY protocol"`
RecordLogins bool `default:"false" usage:"Log and generate metrics on player logins. Metrics only supported with influxdb or prometheus backend"`
Routes RoutesConfig
NgrokToken string `usage:"If set, an ngrok tunnel will be established. It is HIGHLY recommended to pass as an environment variable."`
AutoScale AutoScale
ClientsToAllow []string `usage:"Zero or more client IP addresses or CIDRs to allow. Takes precedence over deny."`
ClientsToDeny []string `usage:"Zero or more client IP addresses or CIDRs to deny. Ignored if any configured to allow"`
SimplifySRV bool `default:"false" usage:"Simplify fully qualified SRV records for mapping"`
Webhook WebhookConfig `usage:"Webhook configuration"`
}
+107 -74
View File
@@ -12,9 +12,12 @@ import (
"sync/atomic"
"time"
"github.com/google/uuid"
"golang.ngrok.com/ngrok"
"golang.ngrok.com/ngrok/config"
"github.com/go-kit/kit/metrics"
"github.com/itzg/mc-router/mcproto"
"github.com/juju/ratelimit"
"github.com/pires/go-proxyproto"
@@ -27,18 +30,58 @@ const (
var noDeadline time.Time
type ActiveConnections struct {
type ConnectorMetrics struct {
Errors metrics.Counter
BytesTransmitted metrics.Counter
ConnectionsFrontend metrics.Counter
ConnectionsBackend metrics.Counter
ActiveConnections metrics.Gauge
ServerActivePlayer metrics.Gauge
ServerLogins metrics.Counter
ServerActiveConnections metrics.Gauge
}
type ClientInfo struct {
Host string `json:"host"`
Port int `json:"port"`
}
func ClientInfoFromAddr(addr net.Addr) *ClientInfo {
if addr == nil {
return nil
}
return &ClientInfo{
Host: addr.(*net.TCPAddr).IP.String(),
Port: addr.(*net.TCPAddr).Port,
}
}
type PlayerInfo struct {
Name string `json:"name"`
Uuid uuid.UUID `json:"uuid"`
}
func (p *PlayerInfo) String() string {
if p == nil {
return ""
}
return fmt.Sprintf("%s/%s", p.Name, p.Uuid)
}
type ServerMetrics struct {
sync.RWMutex
activeConnections map[string]int
}
func NewActiveConnections() *ActiveConnections {
return &ActiveConnections{
func NewServerMetrics() *ServerMetrics {
return &ServerMetrics{
activeConnections: make(map[string]int),
}
}
func (sm *ActiveConnections) Increment(serverAddress string) {
func (sm *ServerMetrics) IncrementActiveConnections(serverAddress string) {
sm.Lock()
defer sm.Unlock()
if _, ok := sm.activeConnections[serverAddress]; !ok {
@@ -48,7 +91,7 @@ func (sm *ActiveConnections) Increment(serverAddress string) {
sm.activeConnections[serverAddress] += 1
}
func (sm *ActiveConnections) Decrement(serverAddress string) {
func (sm *ServerMetrics) DecrementActiveConnections(serverAddress string) {
sm.Lock()
defer sm.Unlock()
if activeConnections, ok := sm.activeConnections[serverAddress]; ok && activeConnections <= 0 {
@@ -58,7 +101,7 @@ func (sm *ActiveConnections) Decrement(serverAddress string) {
sm.activeConnections[serverAddress] -= 1
}
func (sm *ActiveConnections) GetCount(serverAddress string) int {
func (sm *ServerMetrics) ActiveConnectionsValue(serverAddress string) int {
sm.Lock()
defer sm.Unlock()
if activeConnections, ok := sm.activeConnections[serverAddress]; ok {
@@ -67,58 +110,60 @@ func (sm *ActiveConnections) GetCount(serverAddress string) int {
return 0
}
func NewConnector(ctx context.Context, metrics *ConnectorMetrics, sendProxyProto bool, recordLogins bool, autoScaleUpAllowDenyConfig *AllowDenyConfig) *Connector {
func NewConnector(metrics *ConnectorMetrics, sendProxyProto bool, receiveProxyProto bool, trustedProxyNets []*net.IPNet, recordLogins bool, autoScaleUpAllowDenyConfig *AllowDenyConfig) *Connector {
return &Connector{
ctx: ctx,
metrics: metrics,
sendProxyProto: sendProxyProto,
connectionsCond: sync.NewCond(&sync.Mutex{}),
receiveProxyProto: receiveProxyProto,
trustedProxyNets: trustedProxyNets,
recordLogins: recordLogins,
autoScaleUpAllowDenyConfig: autoScaleUpAllowDenyConfig,
activeConnections: NewActiveConnections(),
serverMetrics: NewServerMetrics(),
}
}
type Connector struct {
ctx context.Context
state mcproto.State
metrics *ConnectorMetrics
sendProxyProto bool
receiveProxyProto bool
recordLogins bool
trustedProxyNets []*net.IPNet
totalActiveConnections int32
activeConnections *ActiveConnections
state mcproto.State
metrics *ConnectorMetrics
sendProxyProto bool
receiveProxyProto bool
recordLogins bool
trustedProxyNets []*net.IPNet
activeConnections int32
serverMetrics *ServerMetrics
connectionsCond *sync.Cond
ngrokToken string
clientFilter *ClientFilter
autoScaleUpAllowDenyConfig *AllowDenyConfig
connectionNotifier ConnectionNotifier
connectionNotifier ConnectionNotifier
}
func (c *Connector) UseConnectionNotifier(notifier ConnectionNotifier) {
func (c *Connector) SetConnectionNotifier(notifier ConnectionNotifier) {
c.connectionNotifier = notifier
}
func (c *Connector) UseClientFilter(filter *ClientFilter) {
func (c *Connector) SetClientFilter(filter *ClientFilter) {
c.clientFilter = filter
}
func (c *Connector) StartAcceptingConnections(listenAddress string, connRateLimit int) error {
ln, err := c.createListener(listenAddress)
func (c *Connector) StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error {
ln, err := c.createListener(ctx, listenAddress)
if err != nil {
return err
}
go c.acceptConnections(ln, connRateLimit)
go c.acceptConnections(ctx, ln, connRateLimit)
return nil
}
func (c *Connector) createListener(listenAddress string) (net.Listener, error) {
func (c *Connector) createListener(ctx context.Context, listenAddress string) (net.Listener, error) {
if c.ngrokToken != "" {
ngrokTun, err := ngrok.Listen(c.ctx,
ngrokTun, err := ngrok.Listen(ctx,
config.TCPEndpoint(),
ngrok.WithAuthtoken(c.ngrokToken),
)
@@ -139,8 +184,8 @@ func (c *Connector) createListener(listenAddress string) (net.Listener, error) {
if c.receiveProxyProto {
proxyListener := &proxyproto.Listener{
Listener: listener,
ConnPolicy: c.createProxyProtoPolicy(),
Listener: listener,
Policy: c.createProxyProtoPolicy(),
}
logrus.Info("Using PROXY protocol listener")
return proxyListener, nil
@@ -149,8 +194,8 @@ func (c *Connector) createListener(listenAddress string) (net.Listener, error) {
return listener, nil
}
func (c *Connector) createProxyProtoPolicy() proxyproto.ConnPolicyFunc {
return func(connPolicyOptions proxyproto.ConnPolicyOptions) (proxyproto.Policy, error) {
func (c *Connector) createProxyProtoPolicy() func(upstream net.Addr) (proxyproto.Policy, error) {
return func(upstream net.Addr) (proxyproto.Policy, error) {
trustedIpNets := c.trustedProxyNets
if len(trustedIpNets) == 0 {
@@ -158,7 +203,6 @@ func (c *Connector) createProxyProtoPolicy() proxyproto.ConnPolicyFunc {
return proxyproto.USE, nil
}
upstream := connPolicyOptions.Upstream
upstreamIP := upstream.(*net.TCPAddr).IP
for _, ipNet := range trustedIpNets {
if ipNet.Contains(upstreamIP) {
@@ -177,23 +221,17 @@ func (c *Connector) WaitForConnections() {
defer c.connectionsCond.L.Unlock()
for {
count := atomic.LoadInt32(&c.totalActiveConnections)
count := atomic.LoadInt32(&c.activeConnections)
if count > 0 {
logrus.Infof("Waiting on %d connection(s)", count)
c.connectionsCond.Wait()
} else {
return
break
}
}
}
// AcceptConnection provides a way to externally supply a connection to consume.
// Note that this will skip rate limiting.
func (c *Connector) AcceptConnection(conn net.Conn) {
go c.HandleConnection(conn)
}
func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) {
func (c *Connector) acceptConnections(ctx context.Context, ln net.Listener, connRateLimit int) {
//noinspection GoUnhandledErrorResult
defer ln.Close()
@@ -201,7 +239,7 @@ func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) {
for {
select {
case <-c.ctx.Done():
case <-ctx.Done():
return
case <-time.After(bucket.Take(1)):
@@ -209,13 +247,13 @@ func (c *Connector) acceptConnections(ln net.Listener, connRateLimit int) {
if err != nil {
logrus.WithError(err).Error("Failed to accept connection")
} else {
go c.HandleConnection(conn)
go c.HandleConnection(ctx, conn)
}
}
}
}
func (c *Connector) HandleConnection(frontendConn net.Conn) {
func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn) {
c.metrics.ConnectionsFrontend.Add(1)
//noinspection GoUnhandledErrorResult
defer frontendConn.Close()
@@ -305,7 +343,7 @@ func (c *Connector) HandleConnection(frontendConn net.Conn) {
Debug("Got user info")
}
c.findAndConnectBackend(frontendConn, clientAddr, inspectionBuffer, handshake.ServerAddress, playerInfo, handshake.NextState)
c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, handshake.ServerAddress, playerInfo, handshake.NextState)
} else if packet.PacketID == mcproto.PacketIdLegacyServerListPing {
handshake, ok := packet.Data.(*mcproto.LegacyServerListPing)
@@ -325,7 +363,7 @@ func (c *Connector) HandleConnection(frontendConn net.Conn) {
serverAddress := handshake.ServerAddress
c.findAndConnectBackend(frontendConn, clientAddr, inspectionBuffer, serverAddress, nil, mcproto.StateStatus)
c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, serverAddress, nil, mcproto.StateStatus)
} else {
logrus.
WithField("client", clientAddr).
@@ -356,9 +394,9 @@ func (c *Connector) readPlayerInfo(protocolVersion mcproto.ProtocolVersion, buff
}
}
func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress string, playerInfo *PlayerInfo, backendHostPort string, cleanupMetrics bool, checkScaleDown bool) {
func (c *Connector) cleanupBackendConnection(ctx context.Context, clientAddr net.Addr, serverAddress string, playerInfo *PlayerInfo, backendHostPort string, cleanupMetrics bool, checkScaleDown bool) {
if c.connectionNotifier != nil {
err := c.connectionNotifier.NotifyDisconnected(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort)
err := c.connectionNotifier.NotifyDisconnected(ctx, clientAddr, serverAddress, playerInfo, backendHostPort)
if err != nil {
logrus.WithError(err).Warn("failed to notify disconnected")
}
@@ -366,12 +404,12 @@ func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress
if cleanupMetrics {
c.metrics.ActiveConnections.Set(float64(
atomic.AddInt32(&c.totalActiveConnections, -1)))
atomic.AddInt32(&c.activeConnections, -1)))
c.activeConnections.Decrement(serverAddress)
c.serverMetrics.DecrementActiveConnections(serverAddress)
c.metrics.ServerActiveConnections.
With("server_address", serverAddress).
Set(float64(c.activeConnections.GetCount(serverAddress)))
Set(float64(c.serverMetrics.ActiveConnectionsValue(serverAddress)))
if c.recordLogins && playerInfo != nil {
c.metrics.ServerActivePlayer.
@@ -381,21 +419,21 @@ func (c *Connector) cleanupBackendConnection(clientAddr net.Addr, serverAddress
Set(0)
}
}
if checkScaleDown && c.activeConnections.GetCount(serverAddress) <= 0 {
if checkScaleDown && c.serverMetrics.ActiveConnectionsValue(serverAddress) <= 0 {
DownScaler.Begin(serverAddress)
}
c.connectionsCond.Signal()
}
func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net.Conn,
clientAddr net.Addr, preReadContent io.Reader, serverAddress string, playerInfo *PlayerInfo, nextState mcproto.State) {
backendHostPort, resolvedHost, waker, _ := Routes.FindBackendForServerAddress(c.ctx, serverAddress)
backendHostPort, resolvedHost, waker, _ := Routes.FindBackendForServerAddress(ctx, serverAddress)
cleanupMetrics := false
cleanupCheckScaleDown := false
defer func() {
c.cleanupBackendConnection(clientAddr, serverAddress, playerInfo, backendHostPort, cleanupMetrics, cleanupCheckScaleDown)
c.cleanupBackendConnection(ctx, clientAddr, serverAddress, playerInfo, backendHostPort, cleanupMetrics, cleanupCheckScaleDown)
}()
if waker != nil && nextState > mcproto.StateStatus {
@@ -410,7 +448,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
// Cancel down scaler if active before scale up
DownScaler.Cancel(serverAddress)
cleanupCheckScaleDown = true
if err := waker(c.ctx); err != nil {
if err := waker(ctx); err != nil {
logrus.WithFields(logrus.Fields{"serverAddress": serverAddress}).WithError(err).Error("failed to wake up backend")
c.metrics.Errors.With("type", "wakeup_failed").Add(1)
return
@@ -427,7 +465,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
c.metrics.Errors.With("type", "missing_backend").Add(1)
if c.connectionNotifier != nil {
err := c.connectionNotifier.NotifyMissingBackend(c.ctx, clientAddr, serverAddress, playerInfo)
err := c.connectionNotifier.NotifyMissingBackend(ctx, clientAddr, serverAddress, playerInfo)
if err != nil {
logrus.WithError(err).Warn("failed to notify missing backend")
}
@@ -455,7 +493,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
c.metrics.Errors.With("type", "backend_failed").Add(1)
if c.connectionNotifier != nil {
notifyErr := c.connectionNotifier.NotifyFailedBackendConnection(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort, err)
notifyErr := c.connectionNotifier.NotifyFailedBackendConnection(ctx, clientAddr, serverAddress, playerInfo, backendHostPort, err)
if notifyErr != nil {
logrus.WithError(notifyErr).Warn("failed to notify failed backend connection")
}
@@ -465,7 +503,7 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
}
if c.connectionNotifier != nil {
err := c.connectionNotifier.NotifyConnected(c.ctx, clientAddr, serverAddress, playerInfo, backendHostPort)
err := c.connectionNotifier.NotifyConnected(ctx, clientAddr, serverAddress, playerInfo, backendHostPort)
if err != nil {
logrus.WithError(err).Warn("failed to notify connected")
}
@@ -474,12 +512,12 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
c.metrics.ConnectionsBackend.With("host", resolvedHost).Add(1)
c.metrics.ActiveConnections.Set(float64(
atomic.AddInt32(&c.totalActiveConnections, 1)))
atomic.AddInt32(&c.activeConnections, 1)))
c.activeConnections.Increment(serverAddress)
c.serverMetrics.IncrementActiveConnections(serverAddress)
c.metrics.ServerActiveConnections.
With("server_address", serverAddress).
Set(float64(c.activeConnections.GetCount(serverAddress)))
Set(float64(c.serverMetrics.ActiveConnectionsValue(serverAddress)))
if c.recordLogins && playerInfo != nil {
logrus.
@@ -560,23 +598,23 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
return
}
c.pumpConnections(frontendConn, backendConn, playerInfo)
c.pumpConnections(ctx, frontendConn, backendConn, playerInfo)
}
func (c *Connector) pumpConnections(frontendConn, backendConn net.Conn, playerInfo *PlayerInfo) {
func (c *Connector) pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn, playerInfo *PlayerInfo) {
//noinspection GoUnhandledErrorResult
defer backendConn.Close()
clientAddr := frontendConn.RemoteAddr()
defer logrus.WithField("client", clientAddr).Debug("Closing backend connection")
errorsChan := make(chan error, 2)
errors := make(chan error, 2)
go c.pumpFrames(backendConn, frontendConn, errorsChan, "backend", "frontend", clientAddr, playerInfo)
go c.pumpFrames(frontendConn, backendConn, errorsChan, "frontend", "backend", clientAddr, playerInfo)
go c.pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr, playerInfo)
go c.pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr, playerInfo)
select {
case err := <-errorsChan:
case err := <-errors:
if err != io.EOF {
logrus.WithError(err).
WithField("client", clientAddr).
@@ -584,8 +622,8 @@ func (c *Connector) pumpConnections(frontendConn, backendConn net.Conn, playerIn
c.metrics.Errors.With("type", "relay").Add(1)
}
case <-c.ctx.Done():
logrus.Debug("Connector observed context cancellation")
case <-ctx.Done():
logrus.Debug("Observed context cancellation")
}
}
@@ -611,8 +649,3 @@ func (c *Connector) pumpFrames(incoming io.Reader, outgoing io.Writer, errors ch
func (c *Connector) UseNgrok(token string) {
c.ngrokToken = token
}
func (c *Connector) UseReceiveProxyProto(trustedProxyNets []*net.IPNet) {
c.trustedProxyNets = trustedProxyNets
c.receiveProxyProto = true
}
+1 -3
View File
@@ -61,9 +61,7 @@ func TestTrustedProxyNetworkPolicy(t *testing.T) {
policy := c.createProxyProtoPolicy()
upstreamAddr := &net.TCPAddr{IP: net.ParseIP(test.upstreamIP)}
policyResult, _ := policy(proxyproto.ConnPolicyOptions{
Upstream: upstreamAddr,
})
policyResult, _ := policy(upstreamAddr)
assert.Equal(t, test.expectedPolicy, policyResult, "Unexpected policy result for %s", test.name)
})
}
-229
View File
@@ -1,229 +0,0 @@
package server
import (
"context"
"errors"
"fmt"
"github.com/go-kit/kit/metrics"
"strings"
"time"
kitlogrus "github.com/go-kit/kit/log/logrus"
discardMetrics "github.com/go-kit/kit/metrics/discard"
expvarMetrics "github.com/go-kit/kit/metrics/expvar"
kitinflux "github.com/go-kit/kit/metrics/influx"
prometheusMetrics "github.com/go-kit/kit/metrics/prometheus"
influx "github.com/influxdata/influxdb1-client/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"
)
type MetricsBuilder interface {
BuildConnectorMetrics() *ConnectorMetrics
Start(ctx context.Context) error
}
const (
MetricsBackendExpvar = "expvar"
MetricsBackendPrometheus = "prometheus"
MetricsBackendInfluxDB = "influxdb"
MetricsBackendDiscard = "discard"
)
type MetricsBackendConfig struct {
Influxdb struct {
Interval time.Duration `default:"1m"`
Tags map[string]string `usage:"any extra tags to be included with all reported metrics"`
Addr string
Username string
Password string
Database string
RetentionPolicy string
}
}
// NewMetricsBuilder creates a new MetricsBuilder based on the specified backend.
// If the backend is not recognized, a discard builder is returned.
// config can be nil if the backend is not influxdb.
func NewMetricsBuilder(backend string, config *MetricsBackendConfig) MetricsBuilder {
switch strings.ToLower(backend) {
case MetricsBackendExpvar:
return &expvarMetricsBuilder{}
case MetricsBackendPrometheus:
return &prometheusMetricsBuilder{}
case MetricsBackendInfluxDB:
return &influxMetricsBuilder{config: config}
case MetricsBackendDiscard:
return &discardMetricsBuilder{}
default:
return &discardMetricsBuilder{}
}
}
type expvarMetricsBuilder struct {
}
func (b expvarMetricsBuilder) Start(ctx context.Context) error {
// nothing needed
return nil
}
type ConnectorMetrics struct {
Errors metrics.Counter
BytesTransmitted metrics.Counter
ConnectionsFrontend metrics.Counter
ConnectionsBackend metrics.Counter
ActiveConnections metrics.Gauge
ServerActivePlayer metrics.Gauge
ServerLogins metrics.Counter
ServerActiveConnections metrics.Gauge
}
func (b expvarMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics {
c := expvarMetrics.NewCounter("connections")
return &ConnectorMetrics{
Errors: expvarMetrics.NewCounter("errors").With("subsystem", "connector"),
BytesTransmitted: expvarMetrics.NewCounter("bytes"),
ConnectionsFrontend: c,
ConnectionsBackend: c,
ActiveConnections: expvarMetrics.NewGauge("active_connections"),
ServerActivePlayer: expvarMetrics.NewGauge("server_active_player"),
ServerLogins: expvarMetrics.NewCounter("server_logins"),
ServerActiveConnections: expvarMetrics.NewGauge("server_active_connections"),
}
}
type discardMetricsBuilder struct {
}
func (b discardMetricsBuilder) Start(ctx context.Context) error {
// nothing needed
return nil
}
func (b discardMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics {
return &ConnectorMetrics{
Errors: discardMetrics.NewCounter(),
BytesTransmitted: discardMetrics.NewCounter(),
ConnectionsFrontend: discardMetrics.NewCounter(),
ConnectionsBackend: discardMetrics.NewCounter(),
ActiveConnections: discardMetrics.NewGauge(),
ServerActivePlayer: discardMetrics.NewGauge(),
ServerLogins: discardMetrics.NewCounter(),
ServerActiveConnections: discardMetrics.NewGauge(),
}
}
type influxMetricsBuilder struct {
config *MetricsBackendConfig
metrics *kitinflux.Influx
}
func (b *influxMetricsBuilder) Start(ctx context.Context) error {
influxConfig := &b.config.Influxdb
if influxConfig.Addr == "" {
return errors.New("influx addr is required")
}
ticker := time.NewTicker(influxConfig.Interval)
client, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: influxConfig.Addr,
Username: influxConfig.Username,
Password: influxConfig.Password,
})
if err != nil {
return fmt.Errorf("failed to create influx http client: %w", err)
}
go b.metrics.WriteLoop(ctx, ticker.C, client)
logrus.WithField("addr", influxConfig.Addr).
Debug("reporting metrics to influxdb")
return nil
}
func (b *influxMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics {
influxConfig := &b.config.Influxdb
metrics := kitinflux.New(influxConfig.Tags, influx.BatchPointsConfig{
Database: influxConfig.Database,
RetentionPolicy: influxConfig.RetentionPolicy,
}, kitlogrus.NewLogger(logrus.StandardLogger()))
b.metrics = metrics
c := metrics.NewCounter("mc_router_connections")
return &ConnectorMetrics{
Errors: metrics.NewCounter("mc_router_errors"),
BytesTransmitted: metrics.NewCounter("mc_router_transmitted_bytes"),
ConnectionsFrontend: c.With("side", "frontend"),
ConnectionsBackend: c.With("side", "backend"),
ActiveConnections: metrics.NewGauge("mc_router_connections_active"),
ServerActivePlayer: metrics.NewGauge("mc_router_server_player_active"),
ServerLogins: metrics.NewCounter("mc_router_server_logins"),
ServerActiveConnections: metrics.NewGauge("mc_router_server_active_connections"),
}
}
type prometheusMetricsBuilder struct {
}
var pcv *prometheusMetrics.Counter
func (b prometheusMetricsBuilder) Start(ctx context.Context) error {
// nothing needed
return nil
}
func (b prometheusMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics {
pcv = prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "mc_router",
Name: "errors",
Help: "The total number of errors",
}, []string{"type"}))
return &ConnectorMetrics{
Errors: pcv,
BytesTransmitted: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "mc_router",
Name: "bytes",
Help: "The total number of bytes transmitted",
}, nil)),
ConnectionsFrontend: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "mc_router",
Subsystem: "frontend",
Name: "connections",
Help: "The total number of connections",
ConstLabels: prometheus.Labels{"side": "frontend"},
}, nil)),
ConnectionsBackend: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "mc_router",
Subsystem: "backend",
Name: "connections",
Help: "The total number of backend connections",
ConstLabels: prometheus.Labels{"side": "backend"},
}, []string{"host"})),
ActiveConnections: prometheusMetrics.NewGauge(promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "mc_router",
Name: "active_connections",
Help: "The number of active connections",
}, nil)),
ServerActivePlayer: prometheusMetrics.NewGauge(promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "mc_router",
Name: "server_active_player",
Help: "Player is active on server",
}, []string{"player_name", "player_uuid", "server_address"})),
ServerLogins: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "mc_router",
Name: "server_logins",
Help: "The total number of player logins",
}, []string{"player_name", "player_uuid", "server_address"})),
ServerActiveConnections: prometheusMetrics.NewGauge(promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "mc_router",
Name: "server_active_connections",
Help: "The number of active connections per server",
}, []string{"server_address"})),
}
}
-31
View File
@@ -2,40 +2,9 @@ package server
import (
"context"
"fmt"
"github.com/google/uuid"
"net"
)
type PlayerInfo struct {
Name string `json:"name"`
Uuid uuid.UUID `json:"uuid"`
}
func (p *PlayerInfo) String() string {
if p == nil {
return ""
}
return fmt.Sprintf("%s/%s", p.Name, p.Uuid)
}
type ClientInfo struct {
Host string `json:"host"`
Port int `json:"port"`
}
func ClientInfoFromAddr(addr net.Addr) *ClientInfo {
if addr == nil {
return nil
}
return &ClientInfo{
Host: addr.(*net.TCPAddr).IP.String(),
Port: addr.(*net.TCPAddr).Port,
}
}
type ConnectionNotifier interface {
// NotifyMissingBackend is called when an inbound connection is received for a server that does not have a backend.
NotifyMissingBackend(ctx context.Context, clientAddr net.Addr, server string, playerInfo *PlayerInfo) error
+85 -5
View File
@@ -2,10 +2,13 @@ package server
import (
"context"
"encoding/json"
"net/http"
"regexp"
"strings"
"sync"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
)
@@ -15,6 +18,88 @@ var EmptyScalerFunc = func(ctx context.Context) error { return nil }
var tcpShieldPattern = regexp.MustCompile("///.*")
func init() {
apiRoutes.Path("/routes").Methods("GET").
Headers("Accept", "application/json").
HandlerFunc(routesListHandler)
apiRoutes.Path("/routes").Methods("POST").
Headers("Content-Type", "application/json").
HandlerFunc(routesCreateHandler)
apiRoutes.Path("/defaultRoute").Methods("POST").
Headers("Content-Type", "application/json").
HandlerFunc(routesSetDefault)
apiRoutes.Path("/routes/{serverAddress}").Methods("DELETE").HandlerFunc(routesDeleteHandler)
}
func routesListHandler(writer http.ResponseWriter, _ *http.Request) {
mappings := Routes.GetMappings()
bytes, err := json.Marshal(mappings)
if err != nil {
logrus.WithError(err).Error("Failed to marshal mappings")
writer.WriteHeader(http.StatusInternalServerError)
return
}
_, err = writer.Write(bytes)
if err != nil {
logrus.WithError(err).Error("Failed to write response")
}
}
func routesDeleteHandler(writer http.ResponseWriter, request *http.Request) {
serverAddress := mux.Vars(request)["serverAddress"]
RoutesConfig.DeleteMapping(serverAddress)
if serverAddress != "" {
if Routes.DeleteMapping(serverAddress) {
writer.WriteHeader(http.StatusOK)
} else {
writer.WriteHeader(http.StatusNotFound)
}
}
}
func routesCreateHandler(writer http.ResponseWriter, request *http.Request) {
var definition = struct {
ServerAddress string
Backend string
}{}
//goland:noinspection GoUnhandledErrorResult
defer request.Body.Close()
decoder := json.NewDecoder(request.Body)
err := decoder.Decode(&definition)
if err != nil {
logrus.WithError(err).Error("Unable to get request body")
writer.WriteHeader(http.StatusBadRequest)
return
}
Routes.CreateMapping(definition.ServerAddress, definition.Backend, EmptyScalerFunc, EmptyScalerFunc)
RoutesConfig.AddMapping(definition.ServerAddress, definition.Backend)
writer.WriteHeader(http.StatusCreated)
}
func routesSetDefault(writer http.ResponseWriter, request *http.Request) {
var body = struct {
Backend string
}{}
//goland:noinspection GoUnhandledErrorResult
defer request.Body.Close()
decoder := json.NewDecoder(request.Body)
err := decoder.Decode(&body)
if err != nil {
logrus.WithError(err).Error("Unable to parse request")
writer.WriteHeader(http.StatusBadRequest)
return
}
Routes.SetDefaultRoute(body.Backend)
RoutesConfig.SetDefaultRoute(body.Backend)
writer.WriteHeader(http.StatusOK)
}
type IRoutes interface {
Reset()
RegisterAll(mappings map[string]string)
@@ -27,7 +112,6 @@ type IRoutes interface {
DeleteMapping(serverAddress string) bool
CreateMapping(serverAddress string, backend string, waker ScalerFunc, sleeper ScalerFunc)
SetDefaultRoute(backend string)
GetDefaultRoute() string
SimplifySRV(srvEnabled bool)
}
@@ -73,10 +157,6 @@ func (r *routesImpl) SetDefaultRoute(backend string) {
}).Info("Using default route")
}
func (r *routesImpl) GetDefaultRoute() string {
return r.defaultRoute
}
func (r *routesImpl) SimplifySRV(srvEnabled bool) {
r.simplifySRV = srvEnabled
}
+260
View File
@@ -0,0 +1,260 @@
package server
import (
"context"
"encoding/json"
"time"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io/fs"
"os"
"sync"
)
type IRoutesConfig interface {
ReadRoutesConfig(routesConfig string)
ReloadRoutesConfig()
AddMapping(serverAddress string, backend string)
DeleteMapping(serverAddress string)
SetDefaultRoute(backend string)
WatchForChanges(ctx context.Context) error
}
const debounceConfigRereadDuration = time.Second * 5
var RoutesConfig = &routesConfigImpl{}
type routesConfigImpl struct {
sync.RWMutex
fileName string
}
type routesConfigStructure struct {
DefaultServer string `json:"default-server"`
Mappings map[string]string `json:"mappings"`
}
func (r *routesConfigImpl) ReadRoutesConfig(routesConfig string) error {
r.fileName = routesConfig
logrus.WithField("routesConfig", r.fileName).Info("Loading routes config file")
config, readErr := r.readRoutesConfigFile()
if readErr != nil {
if errors.Is(readErr, fs.ErrNotExist) {
logrus.WithField("routesConfig", r.fileName).Info("Routes config file doses not exist, skipping reading it")
// File doesn't exist -> ignore it
return nil
}
return errors.Wrap(readErr, "Could not load the routes config file")
}
Routes.RegisterAll(config.Mappings)
Routes.SetDefaultRoute(config.DefaultServer)
return nil
}
func (r *routesConfigImpl) ReloadRoutesConfig() error {
config, readErr := r.readRoutesConfigFile()
if readErr != nil {
return readErr
}
logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file")
Routes.Reset()
Routes.RegisterAll(config.Mappings)
Routes.SetDefaultRoute(config.DefaultServer)
return nil
}
func (r *routesConfigImpl) WatchForChanges(ctx context.Context) error {
if r.fileName == "" {
return errors.New("routes config file needs to be specified first")
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "Could not create a watcher")
}
err = watcher.Add(r.fileName)
if err != nil {
return errors.Wrap(err, "Could not watch the routes config file")
}
go func() {
logrus.WithField("file", r.fileName).Info("Watching routes config file")
debounceTimerChan := make(<-chan time.Time)
var debounceTimer *time.Timer
//goland:noinspection GoUnhandledErrorResult
defer watcher.Close()
for {
select {
case event, ok := <-watcher.Events:
if !ok {
logrus.Debug("Watcher events channel closed")
return
}
logrus.
WithField("file", event.Name).
WithField("op", event.Op).
Trace("fs event received")
if event.Op.Has(fsnotify.Write) || event.Op.Has(fsnotify.Create) {
if debounceTimer == nil {
debounceTimer = time.NewTimer(debounceConfigRereadDuration)
} else {
debounceTimer.Reset(debounceConfigRereadDuration)
}
debounceTimerChan = debounceTimer.C
logrus.WithField("delay", debounceConfigRereadDuration).Debug("Will re-read config file after delay")
}
case <-debounceTimerChan:
readErr := r.ReadRoutesConfig(r.fileName)
if readErr != nil {
logrus.
WithError(readErr).
WithField("routesConfig", r.fileName).
Error("Could not re-read the routes config file")
}
case <-ctx.Done():
return
}
}
}()
return nil
}
func (r *routesConfigImpl) AddMapping(serverAddress string, backend string) {
if !r.isRoutesConfigEnabled() {
return
}
config, readErr := r.readRoutesConfigFile()
if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) {
logrus.WithError(readErr).Error("Could not read the routes config file")
return
}
if config.Mappings == nil {
config.Mappings = make(map[string]string)
}
config.Mappings[serverAddress] = backend
writeErr := r.writeRoutesConfigFile(config)
if writeErr != nil {
logrus.WithError(writeErr).Error("Could not write to the routes config file")
return
}
logrus.WithFields(logrus.Fields{
"serverAddress": serverAddress,
"backend": backend,
}).Info("Added route to routes config")
return
}
func (r *routesConfigImpl) SetDefaultRoute(backend string) {
if !r.isRoutesConfigEnabled() {
return
}
config, readErr := r.readRoutesConfigFile()
if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) {
logrus.WithError(readErr).Error("Could not read the routes config file")
return
}
config.DefaultServer = backend
writeErr := r.writeRoutesConfigFile(config)
if writeErr != nil {
logrus.WithError(writeErr).Error("Could not write to the routes config file")
return
}
logrus.WithFields(logrus.Fields{
"backend": backend,
}).Info("Set default route in routes config")
return
}
func (r *routesConfigImpl) DeleteMapping(serverAddress string) {
if !r.isRoutesConfigEnabled() {
return
}
config, readErr := r.readRoutesConfigFile()
if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) {
logrus.WithError(readErr).Error("Could not read the routes config file")
return
}
delete(config.Mappings, serverAddress)
writeErr := r.writeRoutesConfigFile(config)
if writeErr != nil {
logrus.WithError(writeErr).Error("Could not write to the routes config file")
return
}
logrus.WithField("serverAddress", serverAddress).Info("Deleted route in routes config")
return
}
func (r *routesConfigImpl) isRoutesConfigEnabled() bool {
return r.fileName != ""
}
func (r *routesConfigImpl) readRoutesConfigFile() (routesConfigStructure, error) {
r.RLock()
defer r.RUnlock()
config := routesConfigStructure{
"",
make(map[string]string),
}
file, fileErr := os.ReadFile(r.fileName)
if fileErr != nil {
return config, errors.Wrap(fileErr, "Could not load the routes config file")
}
parseErr := json.Unmarshal(file, &config)
if parseErr != nil {
return config, errors.Wrap(parseErr, "Could not parse the json routes config file")
}
return config, nil
}
func (r *routesConfigImpl) writeRoutesConfigFile(config routesConfigStructure) error {
r.Lock()
defer r.Unlock()
newFileContent, parseErr := json.Marshal(config)
if parseErr != nil {
return errors.Wrap(parseErr, "Could not parse the routes to json")
}
fileErr := os.WriteFile(r.fileName, newFileContent, 0664)
if fileErr != nil {
return errors.Wrap(fileErr, "Could not write to the routes config file")
}
return nil
}
-181
View File
@@ -1,181 +0,0 @@
package server
import (
"context"
"encoding/json"
"time"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io/fs"
"os"
)
const debounceConfigRereadDuration = time.Second * 5
var RoutesConfigLoader = &routesConfigLoader{}
type routesConfigLoader struct {
fileName string
}
// RoutesConfigSchema declares the schema of the json file that can provide routes to serve
type RoutesConfigSchema struct {
DefaultServer string `json:"default-server"`
Mappings map[string]string `json:"mappings"`
}
func (r *routesConfigLoader) Load(routesConfigFileName string) error {
r.fileName = routesConfigFileName
logrus.WithField("routesConfigFileName", r.fileName).Info("Loading routes config file")
config, readErr := r.readFile()
if readErr != nil {
if errors.Is(readErr, fs.ErrNotExist) {
logrus.WithField("routesConfigFileName", r.fileName).Info("Routes config file doses not exist, skipping reading it")
// File doesn't exist -> ignore it
return nil
}
return errors.Wrap(readErr, "Could not load the routes config file")
}
Routes.RegisterAll(config.Mappings)
Routes.SetDefaultRoute(config.DefaultServer)
return nil
}
func (r *routesConfigLoader) Reload() error {
if !r.isEnabled() {
return nil
}
config, readErr := r.readFile()
if readErr != nil {
return readErr
}
logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file")
Routes.Reset()
Routes.RegisterAll(config.Mappings)
Routes.SetDefaultRoute(config.DefaultServer)
return nil
}
func (r *routesConfigLoader) WatchForChanges(ctx context.Context) error {
if r.fileName == "" {
return errors.New("routes config file needs to be specified first")
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "Could not create a watcher")
}
err = watcher.Add(r.fileName)
if err != nil {
return errors.Wrap(err, "Could not watch the routes config file")
}
go func() {
logrus.WithField("file", r.fileName).Info("Watching routes config file")
debounceTimerChan := make(<-chan time.Time)
var debounceTimer *time.Timer
//goland:noinspection GoUnhandledErrorResult
defer watcher.Close()
for {
select {
case event, ok := <-watcher.Events:
if !ok {
logrus.Debug("Watcher events channel closed")
return
}
logrus.
WithField("file", event.Name).
WithField("op", event.Op).
Trace("fs event received")
if event.Op.Has(fsnotify.Write) || event.Op.Has(fsnotify.Create) {
if debounceTimer == nil {
debounceTimer = time.NewTimer(debounceConfigRereadDuration)
} else {
debounceTimer.Reset(debounceConfigRereadDuration)
}
debounceTimerChan = debounceTimer.C
logrus.WithField("delay", debounceConfigRereadDuration).Debug("Will re-read config file after delay")
}
case <-debounceTimerChan:
readErr := r.Load(r.fileName)
if readErr != nil {
logrus.
WithError(readErr).
WithField("routesConfig", r.fileName).
Error("Could not re-read the routes config file")
}
case <-ctx.Done():
return
}
}
}()
return nil
}
func (r *routesConfigLoader) SaveRoutes() {
if !r.isEnabled() {
return
}
err := r.writeFile(&RoutesConfigSchema{
DefaultServer: Routes.GetDefaultRoute(),
Mappings: Routes.GetMappings(),
})
if err != nil {
logrus.WithError(err).Error("Could not save the routes config file")
return
}
logrus.Info("Saved routes config")
}
func (r *routesConfigLoader) isEnabled() bool {
return r.fileName != ""
}
func (r *routesConfigLoader) readFile() (*RoutesConfigSchema, error) {
var config RoutesConfigSchema
content, err := os.ReadFile(r.fileName)
if err != nil {
return &config, errors.Wrap(err, "Could not load the routes config file")
}
parseErr := json.Unmarshal(content, &config)
if parseErr != nil {
return &config, errors.Wrap(parseErr, "Could not parse the json routes config file")
}
return &config, nil
}
func (r *routesConfigLoader) writeFile(config *RoutesConfigSchema) error {
newFileContent, err := json.Marshal(config)
if err != nil {
return errors.Wrap(err, "Could not parse the routes to json")
}
err = os.WriteFile(r.fileName, newFileContent, 0664)
if err != nil {
return errors.Wrap(err, "Could not write to the routes config file")
}
return nil
}
-224
View File
@@ -1,224 +0,0 @@
package server
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"net"
"os"
"runtime/pprof"
"strconv"
"time"
)
type Server struct {
ctx context.Context
config *Config
connector *Connector
reloadConfigChan chan struct{}
doneChan chan struct{}
}
func NewServer(ctx context.Context, config *Config) (*Server, error) {
if config.CpuProfile != "" {
cpuProfileFile, err := os.Create(config.CpuProfile)
if err != nil {
return nil, fmt.Errorf("could not create cpu profile file: %w", err)
}
//goland:noinspection GoUnhandledErrorResult
defer cpuProfileFile.Close()
logrus.WithField("file", config.CpuProfile).Info("Starting cpu profiling")
err = pprof.StartCPUProfile(cpuProfileFile)
if err != nil {
return nil, fmt.Errorf("could not start cpu profile: %w", err)
}
defer pprof.StopCPUProfile()
}
var err error
var autoScaleAllowDenyConfig *AllowDenyConfig = nil
if config.AutoScale.AllowDeny != "" {
autoScaleAllowDenyConfig, err = ParseAllowDenyConfig(config.AutoScale.AllowDeny)
if err != nil {
return nil, fmt.Errorf("could not parse autoscale allow-deny-list: %w", err)
}
}
metricsBuilder := NewMetricsBuilder(config.MetricsBackend, &config.MetricsBackendConfig)
downScalerEnabled := config.AutoScale.Down && (config.InKubeCluster || config.KubeConfig != "")
downScalerDelay, err := time.ParseDuration(config.AutoScale.DownAfter)
if err != nil {
return nil, fmt.Errorf("could not parse auto-scale-down-after duration: %w", err)
}
// Only one instance should be created
DownScaler = NewDownScaler(ctx, downScalerEnabled, downScalerDelay)
if config.Routes.Config != "" {
err := RoutesConfigLoader.Load(config.Routes.Config)
if err != nil {
return nil, fmt.Errorf("could not load routes config file: %w", err)
}
if config.Routes.ConfigWatch {
err := RoutesConfigLoader.WatchForChanges(ctx)
if err != nil {
return nil, fmt.Errorf("could not watch for changes to routes config file: %w", err)
}
}
}
Routes.RegisterAll(config.Mapping)
if config.Default != "" {
Routes.SetDefaultRoute(config.Default)
}
if config.ConnectionRateLimit < 1 {
config.ConnectionRateLimit = 1
}
connector := NewConnector(ctx,
metricsBuilder.BuildConnectorMetrics(),
config.UseProxyProtocol,
config.RecordLogins,
autoScaleAllowDenyConfig)
clientFilter, err := NewClientFilter(config.ClientsToAllow, config.ClientsToDeny)
if err != nil {
return nil, fmt.Errorf("could not create client filter: %w", err)
}
connector.UseClientFilter(clientFilter)
if config.Webhook.Url != "" {
logrus.WithField("url", config.Webhook.Url).
WithField("require-user", config.Webhook.RequireUser).
Info("Using webhook for connection status notifications")
connector.UseConnectionNotifier(
NewWebhookNotifier(config.Webhook.Url, config.Webhook.RequireUser))
}
if config.NgrokToken != "" {
connector.UseNgrok(config.NgrokToken)
}
if config.ReceiveProxyProtocol {
trustedIpNets := make([]*net.IPNet, 0)
for _, ip := range config.TrustedProxies {
_, ipNet, err := net.ParseCIDR(ip)
if err != nil {
return nil, fmt.Errorf("could not parse trusted proxy CIDR block: %w", err)
}
trustedIpNets = append(trustedIpNets, ipNet)
}
connector.UseReceiveProxyProto(trustedIpNets)
}
if config.ApiBinding != "" {
StartApiServer(config.ApiBinding)
}
if config.InKubeCluster {
err = K8sWatcher.StartInCluster(config.AutoScale.Up, config.AutoScale.Down)
if err != nil {
return nil, fmt.Errorf("could not start in-cluster k8s integration: %w", err)
} else {
defer K8sWatcher.Stop()
}
} else if config.KubeConfig != "" {
err := K8sWatcher.StartWithConfig(config.KubeConfig, config.AutoScale.Up, config.AutoScale.Down)
if err != nil {
return nil, fmt.Errorf("could not start k8s integration with kube config: %w", err)
} else {
defer K8sWatcher.Stop()
}
}
if config.InDocker {
err = DockerWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down)
if err != nil {
return nil, fmt.Errorf("could not start docker integration: %w", err)
} else {
defer DockerWatcher.Stop()
}
}
if config.InDockerSwarm {
err = DockerSwarmWatcher.Start(config.DockerSocket, config.DockerTimeout, config.DockerRefreshInterval, config.AutoScale.Up, config.AutoScale.Down)
if err != nil {
return nil, fmt.Errorf("could not start docker swarm integration: %w", err)
} else {
defer DockerSwarmWatcher.Stop()
}
}
Routes.SimplifySRV(config.SimplifySRV)
err = metricsBuilder.Start(ctx)
if err != nil {
return nil, fmt.Errorf("could not start metrics reporter: %w", err)
}
return &Server{
ctx: ctx,
config: config,
connector: connector,
reloadConfigChan: make(chan struct{}),
doneChan: make(chan struct{}),
}, nil
}
// Done provides a channel notified when the server has closed all connections, etc
func (s *Server) Done() <-chan struct{} {
return s.doneChan
}
func (s *Server) notifyDone() {
s.doneChan <- struct{}{}
}
// ReloadConfig indicates that an external request, such as a SIGHUP,
// is requesting the routes config file to be reloaded, if enabled
func (s *Server) ReloadConfig() {
s.reloadConfigChan <- struct{}{}
}
// AcceptConnection provides a way to externally supply a connection to consume
// Note that this will skip rate limiting.
func (s *Server) AcceptConnection(conn net.Conn) {
s.connector.AcceptConnection(conn)
}
// Run will run the server until the context is done or a fatal error occurs, so this should be
// in a go routine.
func (s *Server) Run() {
err := s.connector.StartAcceptingConnections(
net.JoinHostPort("", strconv.Itoa(s.config.Port)),
s.config.ConnectionRateLimit,
)
if err != nil {
logrus.WithError(err).Error("Could not start accepting connections")
s.notifyDone()
return
}
for {
select {
case <-s.reloadConfigChan:
if err := RoutesConfigLoader.Reload(); err != nil {
logrus.WithError(err).
Error("Could not re-read the routes config file")
}
case <-s.ctx.Done():
logrus.Info("Stopping. Waiting for connections to complete...")
s.connector.WaitForConnections()
logrus.Info("Stopped")
s.notifyDone()
return
}
}
}