Added influxdb as a metrics reporter/backend

This commit is contained in:
Geoff Bourne
2020-01-04 14:22:01 -06:00
parent 7f50c512f5
commit a12bbb88a3
7 changed files with 171 additions and 28 deletions
+35 -14
View File
@@ -13,19 +13,34 @@ import (
"strconv"
"strings"
"syscall"
"time"
)
type MetricsBackendConfig struct {
Influxdb struct {
Interval time.Duration `default:"1m"`
Tags map[string]string `usage:"any extra tags to be included with all reported metrics"`
Addr string
Username string
Password string
Database string
RetentionPolicy string
}
}
type Config struct {
Port int `default:"25565" usage:"The [port] bound to listen for Minecraft client connections"`
Mapping string `usage:"Comma-separated mappings of externalHostname=host:port"`
ApiBinding string `usage:"The [host:port] bound for servicing API requests"`
Version bool `usage:"Output version and exit"`
CpuProfile string `usage:"Enables CPU profiling and writes to given path"`
Debug bool `usage:"Enable debug logs"`
ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"`
InKubeCluster bool `usage:"Use in-cluster kubernetes config"`
KubeConfig string `usage:"The path to a kubernetes configuration file"`
MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar"`
Port int `default:"25565" usage:"The [port] bound to listen for Minecraft client connections"`
Mapping string `usage:"Comma-separated mappings of externalHostname=host:port"`
ApiBinding string `usage:"The [host:port] bound for servicing API requests"`
Version bool `usage:"Output version and exit"`
CpuProfile string `usage:"Enables CPU profiling and writes to given path"`
Debug bool `usage:"Enable debug logs"`
ConnectionRateLimit int `default:"1" usage:"Max number of connections to allow per second"`
KubeDiscovery bool `usage:"Enables discovery of annotated kubernetes services"`
InKubeCluster bool `usage:"Use in-cluster kubernetes config"`
KubeConfig string `usage:"The path to a kubernetes configuration file"`
MetricsBackend string `default:"discard" usage:"Backend to use for metrics exposure/publishing: discard,expvar,influxdb"`
MetricsBackendConfig MetricsBackendConfig
}
var (
@@ -71,8 +86,9 @@ func main() {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
metricsBuilder := NewMetricsBuilder(config.MetricsBackend)
metricsBuilder := NewMetricsBuilder(config.MetricsBackend, &config.MetricsBackendConfig)
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
@@ -98,22 +114,27 @@ func main() {
if config.InKubeCluster {
err = server.K8sWatcher.StartInCluster()
if err != nil {
logrus.WithError(err).Warn("Unable to start k8s integration")
logrus.WithError(err).Fatal("Unable to start k8s integration")
} else {
defer server.K8sWatcher.Stop()
}
} else if config.KubeConfig != "" {
err := server.K8sWatcher.StartWithConfig(config.KubeConfig)
if err != nil {
logrus.WithError(err).Warn("Unable to start k8s integration")
logrus.WithError(err).Fatal("Unable to start k8s integration")
} else {
defer server.K8sWatcher.Stop()
}
}
err = metricsBuilder.Start(ctx)
if err != nil {
logrus.WithError(err).Fatal("Unable to start metrics reporter")
}
// wait for process-stop signal
<-c
logrus.Info("Stopping")
cancel()
}
func parseMappings(val string) map[string]string {
+71 -6
View File
@@ -1,31 +1,44 @@
package main
import (
"context"
"errors"
"fmt"
kitlogrus "github.com/go-kit/kit/log/logrus"
discardMetrics "github.com/go-kit/kit/metrics/discard"
expvarMetrics "github.com/go-kit/kit/metrics/expvar"
kitinflux "github.com/go-kit/kit/metrics/influx"
influx "github.com/influxdata/influxdb1-client/v2"
"github.com/itzg/mc-router/server"
"github.com/sirupsen/logrus"
"strings"
"time"
)
type MetricsBuilder interface {
BuildConnectorMetrics() *server.ConnectorMetrics
Start(ctx context.Context) error
}
func NewMetricsBuilder(backend string) MetricsBuilder {
switch backend {
case "discard":
return &discardMetricsBuilder{}
func NewMetricsBuilder(backend string, config *MetricsBackendConfig) MetricsBuilder {
switch strings.ToLower(backend) {
case "expvar":
return &expvarMetricsBuilder{}
case "influxdb":
return &influxMetricsBuilder{config: config}
default:
logrus.Fatalf("Unsupported metrics backend: %s", backend)
return nil
return &discardMetricsBuilder{}
}
}
type expvarMetricsBuilder struct {
}
func (b expvarMetricsBuilder) Start(ctx context.Context) error {
// nothing needed
return nil
}
func (b expvarMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics {
return &server.ConnectorMetrics{
Errors: expvarMetrics.NewCounter("errors").With("subsystem", "connector"),
@@ -38,6 +51,11 @@ func (b expvarMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics {
type discardMetricsBuilder struct {
}
func (b discardMetricsBuilder) Start(ctx context.Context) error {
// nothing needed
return nil
}
func (b discardMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics {
return &server.ConnectorMetrics{
Errors: discardMetrics.NewCounter(),
@@ -46,3 +64,50 @@ func (b discardMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics
ActiveConnections: discardMetrics.NewGauge(),
}
}
type influxMetricsBuilder struct {
config *MetricsBackendConfig
metrics *kitinflux.Influx
}
func (b *influxMetricsBuilder) Start(ctx context.Context) error {
influxConfig := &b.config.Influxdb
if influxConfig.Addr == "" {
return errors.New("influx addr is required")
}
ticker := time.NewTicker(influxConfig.Interval)
client, err := influx.NewHTTPClient(influx.HTTPConfig{
Addr: influxConfig.Addr,
Username: influxConfig.Username,
Password: influxConfig.Password,
})
if err != nil {
return fmt.Errorf("failed to create influx http client: %w", err)
}
go b.metrics.WriteLoop(ctx, ticker.C, client)
logrus.WithField("addr", influxConfig.Addr).
Debug("reporting metrics to influxdb")
return nil
}
func (b *influxMetricsBuilder) BuildConnectorMetrics() *server.ConnectorMetrics {
influxConfig := &b.config.Influxdb
metrics := kitinflux.New(influxConfig.Tags, influx.BatchPointsConfig{
Database: influxConfig.Database,
RetentionPolicy: influxConfig.RetentionPolicy,
}, kitlogrus.NewLogrusLogger(logrus.StandardLogger()))
b.metrics = metrics
return &server.ConnectorMetrics{
Errors: metrics.NewCounter("errors"),
BytesTransmitted: metrics.NewCounter("transmitted_bytes"),
Connections: metrics.NewCounter("connections"),
ActiveConnections: metrics.NewGauge("connections_active"),
}
}