From b290243d403185b460c6deecee564c08ae970ca4 Mon Sep 17 00:00:00 2001 From: Geoff Bourne Date: Sun, 14 Jul 2019 16:34:46 -0500 Subject: [PATCH] Initial support for tracking connection metrics --- cmd/mc-router/main.go | 6 ++++- cmd/mc-router/metrics.go | 48 ++++++++++++++++++++++++++++++++++++++++ go.mod | 2 ++ go.sum | 4 ++++ server/api_server.go | 8 +++++-- server/connector.go | 34 ++++++++++++++++++++++------ server/routes.go | 10 ++++----- server/routes_test.go | 5 ++++- 8 files changed, 101 insertions(+), 16 deletions(-) create mode 100644 cmd/mc-router/metrics.go diff --git a/cmd/mc-router/main.go b/cmd/mc-router/main.go index 1c76201..c0f47ca 100644 --- a/cmd/mc-router/main.go +++ b/cmd/mc-router/main.go @@ -25,6 +25,7 @@ var ( cpuProfile = flag.String("cpu-profile", "", "Enables CPU profiling and writes to given path") debug = flag.Bool("debug", false, "Enable debug logs") connRateLimit = flag.Int("connection-rate-limit", 1, "Max number of connections to allow per second") + metricsBackend = flag.String("metrics-backend", "discard", "Backend to use for metrics exposure/publishing: discard,expvar") ) var ( @@ -67,6 +68,8 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) + metricsBuilder := NewMetricsBuilder() + c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) @@ -75,7 +78,8 @@ func main() { if *connRateLimit < 1 { *connRateLimit = 1 } - server.Connector.StartAcceptingConnections(ctx, net.JoinHostPort("", strconv.Itoa(*port)), *connRateLimit) + connector := server.NewConnector(metricsBuilder.BuildConnectorMetrics()) + connector.StartAcceptingConnections(ctx, net.JoinHostPort("", strconv.Itoa(*port)), *connRateLimit) if *apiBinding != "" { server.StartApiServer(*apiBinding) diff --git a/cmd/mc-router/metrics.go b/cmd/mc-router/metrics.go new file mode 100644 index 0000000..dd9daaa --- /dev/null +++ b/cmd/mc-router/metrics.go @@ -0,0 +1,48 @@ +package main + +import ( + discardMetrics "github.com/go-kit/kit/metrics/discard" + expvarMetrics "github.com/go-kit/kit/metrics/expvar" + "github.com/itzg/mc-router/server" + "github.com/sirupsen/logrus" +) + +type MetricsBuilder interface { + BuildConnectorMetrics() *server.ConnectorMetrics +} + +func NewMetricsBuilder() MetricsBuilder { + switch *metricsBackend { + case "discard": + return &discardMetricsBuilder{} + case "expvar": + return &expvarMetricsBuilder{} + default: + logrus.Fatalf("Unsupported metrics backend: %s", metricsBackend) + return nil + } +} + +type expvarMetricsBuilder struct { +} + +func (b expvarMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { + return &server.ConnectorMetrics{ + Errors: expvarMetrics.NewCounter("errors").With("subsystem", "connector"), + BytesTransmitted: expvarMetrics.NewCounter("bytes"), + Connections: expvarMetrics.NewCounter("connections"), + ActiveConnections: expvarMetrics.NewGauge("active_connections"), + } +} + +type discardMetricsBuilder struct { +} + +func (b discardMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics { + return &server.ConnectorMetrics{ + Errors: discardMetrics.NewCounter(), + BytesTransmitted: discardMetrics.NewCounter(), + Connections: discardMetrics.NewCounter(), + ActiveConnections: discardMetrics.NewGauge(), + } +} diff --git a/go.mod b/go.mod index 973e76a..686912c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/itzg/mc-router go 1.12 require ( + github.com/VividCortex/gohistogram v1.0.0 // indirect + github.com/go-kit/kit v0.9.0 github.com/gogo/protobuf v1.2.1 // indirect github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect github.com/golang/protobuf v1.3.1 // indirect diff --git a/go.sum b/go.sum index 5681f1f..e4b810d 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,10 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= +github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk= +github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= diff --git a/server/api_server.go b/server/api_server.go index d29e0ea..12725f2 100644 --- a/server/api_server.go +++ b/server/api_server.go @@ -1,15 +1,19 @@ package server import ( - "net/http" - "github.com/sirupsen/logrus" + "expvar" "github.com/gorilla/mux" + "github.com/sirupsen/logrus" + "net/http" ) var apiRoutes = mux.NewRouter() func StartApiServer(apiBinding string) { logrus.WithField("binding", apiBinding).Info("Serving API requests") + + apiRoutes.Path("/vars").Handler(expvar.Handler()) + go func() { logrus.WithError( http.ListenAndServe(apiBinding, apiRoutes)).Error("API server failed") diff --git a/server/connector.go b/server/connector.go index 2e751de..bce9c52 100644 --- a/server/connector.go +++ b/server/connector.go @@ -3,6 +3,7 @@ package server import ( "bytes" "context" + "github.com/go-kit/kit/metrics" "github.com/itzg/mc-router/mcproto" "github.com/juju/ratelimit" "github.com/sirupsen/logrus" @@ -17,11 +18,12 @@ const ( var noDeadline time.Time -type IConnector interface { +type Connector interface { StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error } type ConnectorMetrics struct { + Errors metrics.Counter BytesTransmitted metrics.Counter Connections metrics.Counter ActiveConnections metrics.Gauge @@ -76,6 +78,7 @@ func (c *connectorImpl) acceptConnections(ctx context.Context, ln net.Listener, } func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.Conn) { + c.metrics.Connections.With("direction", "frontend").Add(1) //noinspection GoUnhandledErrorResult defer frontendConn.Close() @@ -94,11 +97,13 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C WithError(err). WithField("client", clientAddr). Error("Failed to set read deadline") + c.metrics.Errors.With("type", "read_deadline").Add(1) return } packet, err := mcproto.ReadPacket(inspectionReader, clientAddr, c.state) if err != nil { logrus.WithError(err).WithField("clientAddr", clientAddr).Error("Failed to read packet") + c.metrics.Errors.With("type", "read").Add(1) return } @@ -113,6 +118,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C if err != nil { logrus.WithError(err).WithField("clientAddr", clientAddr). Error("Failed to read handshake") + c.metrics.Errors.With("type", "read").Add(1) return } @@ -131,6 +137,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C WithField("client", clientAddr). WithField("packet", packet). Warn("Unexpected data type for PacketIdLegacyServerListPing") + c.metrics.Errors.With("type", "unexpected_content").Add(1) return } @@ -147,6 +154,7 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C WithField("client", clientAddr). WithField("packetID", packet.PacketID). Error("Unexpected packetID, expected handshake") + c.metrics.Errors.With("type", "unexpected_content").Add(1) return } } @@ -154,9 +162,10 @@ func (c *connectorImpl) HandleConnection(ctx context.Context, frontendConn net.C func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn net.Conn, clientAddr net.Addr, preReadContent io.Reader, serverAddress string) { - backendHostPort := Routes.FindBackendForServerAddress(serverAddress) + backendHostPort, resolvedHost := Routes.FindBackendForServerAddress(serverAddress) if backendHostPort == "" { logrus.WithField("serverAddress", serverAddress).Warn("Unable to find registered backend") + c.metrics.Errors.With("type", "missing_backend").Add(1) return } logrus. @@ -172,11 +181,18 @@ func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn WithField("serverAddress", serverAddress). WithField("backend", backendHostPort). Warn("Unable to connect to backend") + c.metrics.Errors.With("type", "backend_failed").Add(1) return } + + c.metrics.Connections.With("direction", "backend", "host", resolvedHost).Add(1) + c.metrics.ActiveConnections.Add(1) + defer c.metrics.ActiveConnections.Add(-1) + amount, err := io.Copy(backendConn, preReadContent) if err != nil { logrus.WithError(err).Error("Failed to write handshake to backend connection") + c.metrics.Errors.With("type", "backend_failed").Add(1) return } logrus.WithField("amount", amount).Debug("Relayed handshake to backend") @@ -185,13 +201,14 @@ func (c *connectorImpl) findAndConnectBackend(ctx context.Context, frontendConn WithError(err). WithField("client", clientAddr). Error("Failed to clear read deadline") + c.metrics.Errors.With("type", "read_deadline").Add(1) return } - pumpConnections(ctx, frontendConn, backendConn) + c.pumpConnections(ctx, frontendConn, backendConn) return } -func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) { +func (c *connectorImpl) pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) { //noinspection GoUnhandledErrorResult defer backendConn.Close() @@ -200,8 +217,8 @@ func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) { errors := make(chan error, 2) - go pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr) - go pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr) + go c.pumpFrames(backendConn, frontendConn, errors, "backend", "frontend", clientAddr) + go c.pumpFrames(frontendConn, backendConn, errors, "frontend", "backend", clientAddr) select { case err := <-errors: @@ -209,6 +226,7 @@ func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) { logrus.WithError(err). WithField("client", clientAddr). Error("Error observed on connection relay") + c.metrics.Errors.With("type", "relay").Add(1) } case <-ctx.Done(): @@ -216,13 +234,15 @@ func pumpConnections(ctx context.Context, frontendConn, backendConn net.Conn) { } } -func pumpFrames(incoming io.Reader, outgoing io.Writer, errors chan<- error, from, to string, clientAddr net.Addr) { +func (c *connectorImpl) pumpFrames(incoming io.Reader, outgoing io.Writer, errors chan<- error, from, to string, clientAddr net.Addr) { amount, err := io.Copy(outgoing, incoming) logrus. WithField("client", clientAddr). WithField("amount", amount). Infof("Finished relay %s->%s", from, to) + c.metrics.BytesTransmitted.Add(float64(amount)) + if err != nil { errors <- err } else { diff --git a/server/routes.go b/server/routes.go index d19ea59..8ca4fcc 100644 --- a/server/routes.go +++ b/server/routes.go @@ -87,7 +87,7 @@ type IRoutes interface { RegisterAll(mappings map[string]string) // FindBackendForServerAddress returns the host:port for the external server address, if registered. // Otherwise, an empty string is returned - FindBackendForServerAddress(serverAddress string) string + FindBackendForServerAddress(serverAddress string) (string, string) GetMappings() map[string]string DeleteMapping(serverAddress string) bool CreateMapping(serverAddress string, backend string) @@ -125,7 +125,7 @@ func (r *routesImpl) SetDefaultRoute(backend string) { }).Info("Using default route") } -func (r *routesImpl) FindBackendForServerAddress(serverAddress string) string { +func (r *routesImpl) FindBackendForServerAddress(serverAddress string) (string, string) { r.RLock() defer r.RUnlock() @@ -134,13 +134,13 @@ func (r *routesImpl) FindBackendForServerAddress(serverAddress string) string { address := strings.ToLower(addressParts[0]) if r.mappings == nil { - return r.defaultRoute + return r.defaultRoute, address } else { if route, exists := r.mappings[address]; exists { - return route + return route, address } else { - return r.defaultRoute + return r.defaultRoute, address } } } diff --git a/server/routes_test.go b/server/routes_test.go index aecc589..57a5f16 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1,6 +1,7 @@ package server import ( + "github.com/stretchr/testify/assert" "testing" ) @@ -45,8 +46,10 @@ func Test_routesImpl_FindBackendForServerAddress(t *testing.T) { r.CreateMapping(tt.mapping.serverAddress, tt.mapping.backend) - if got := r.FindBackendForServerAddress(tt.args.serverAddress); got != tt.want { + if got, server := r.FindBackendForServerAddress(tt.args.serverAddress); got != tt.want { t.Errorf("routesImpl.FindBackendForServerAddress() = %v, want %v", got, tt.want) + } else { + assert.Equal(t, tt.mapping.serverAddress, server) } }) }