Code cleanup of routes config loader and API server (#424)
This commit is contained in:
+85
-2
@@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"expvar"
|
||||
"net/http"
|
||||
|
||||
@@ -9,11 +10,12 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var apiRoutes = mux.NewRouter()
|
||||
|
||||
func StartApiServer(apiBinding string) {
|
||||
logrus.WithField("binding", apiBinding).Info("Serving API requests")
|
||||
|
||||
var apiRoutes = mux.NewRouter()
|
||||
registerApiRoutes(apiRoutes)
|
||||
|
||||
apiRoutes.Path("/vars").Handler(expvar.Handler())
|
||||
|
||||
apiRoutes.Path("/metrics").Handler(promhttp.Handler())
|
||||
@@ -23,3 +25,84 @@ func StartApiServer(apiBinding string) {
|
||||
http.ListenAndServe(apiBinding, apiRoutes)).Error("API server failed")
|
||||
}()
|
||||
}
|
||||
|
||||
func registerApiRoutes(apiRoutes *mux.Router) {
|
||||
apiRoutes.Path("/routes").Methods("GET").
|
||||
HandlerFunc(routesListHandler)
|
||||
apiRoutes.Path("/routes").Methods("POST").
|
||||
HandlerFunc(routesCreateHandler)
|
||||
apiRoutes.Path("/defaultRoute").Methods("POST").
|
||||
HandlerFunc(routesSetDefault)
|
||||
apiRoutes.Path("/routes/{serverAddress}").Methods("DELETE").HandlerFunc(routesDeleteHandler)
|
||||
}
|
||||
|
||||
func routesListHandler(writer http.ResponseWriter, _ *http.Request) {
|
||||
mappings := Routes.GetMappings()
|
||||
bytes, err := json.Marshal(mappings)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to marshal mappings")
|
||||
writer.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
_, err = writer.Write(bytes)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to write response")
|
||||
}
|
||||
}
|
||||
|
||||
func routesDeleteHandler(writer http.ResponseWriter, request *http.Request) {
|
||||
serverAddress := mux.Vars(request)["serverAddress"]
|
||||
if serverAddress != "" {
|
||||
if Routes.DeleteMapping(serverAddress) {
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
RoutesConfigLoader.SaveRoutes()
|
||||
}
|
||||
}
|
||||
|
||||
func routesCreateHandler(writer http.ResponseWriter, request *http.Request) {
|
||||
var definition = struct {
|
||||
ServerAddress string
|
||||
Backend string
|
||||
}{}
|
||||
|
||||
//goland:noinspection GoUnhandledErrorResult
|
||||
defer request.Body.Close()
|
||||
|
||||
decoder := json.NewDecoder(request.Body)
|
||||
err := decoder.Decode(&definition)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to get request body")
|
||||
writer.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
Routes.CreateMapping(definition.ServerAddress, definition.Backend, EmptyScalerFunc, EmptyScalerFunc)
|
||||
RoutesConfigLoader.SaveRoutes()
|
||||
writer.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
func routesSetDefault(writer http.ResponseWriter, request *http.Request) {
|
||||
var body = struct {
|
||||
Backend string
|
||||
}{}
|
||||
|
||||
//goland:noinspection GoUnhandledErrorResult
|
||||
defer request.Body.Close()
|
||||
|
||||
decoder := json.NewDecoder(request.Body)
|
||||
err := decoder.Decode(&body)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to parse request")
|
||||
writer.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
Routes.SetDefaultRoute(body.Backend)
|
||||
RoutesConfigLoader.SaveRoutes()
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,217 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
kitlogrus "github.com/go-kit/kit/log/logrus"
|
||||
discardMetrics "github.com/go-kit/kit/metrics/discard"
|
||||
expvarMetrics "github.com/go-kit/kit/metrics/expvar"
|
||||
kitinflux "github.com/go-kit/kit/metrics/influx"
|
||||
prometheusMetrics "github.com/go-kit/kit/metrics/prometheus"
|
||||
influx "github.com/influxdata/influxdb1-client/v2"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type MetricsBuilder interface {
|
||||
BuildConnectorMetrics() *ConnectorMetrics
|
||||
Start(ctx context.Context) error
|
||||
}
|
||||
|
||||
const (
|
||||
MetricsBackendExpvar = "expvar"
|
||||
MetricsBackendPrometheus = "prometheus"
|
||||
MetricsBackendInfluxDB = "influxdb"
|
||||
MetricsBackendDiscard = "discard"
|
||||
)
|
||||
|
||||
type MetricsBackendConfig struct {
|
||||
Influxdb struct {
|
||||
Interval time.Duration `default:"1m"`
|
||||
Tags map[string]string `usage:"any extra tags to be included with all reported metrics"`
|
||||
Addr string
|
||||
Username string
|
||||
Password string
|
||||
Database string
|
||||
RetentionPolicy string
|
||||
}
|
||||
}
|
||||
|
||||
// NewMetricsBuilder creates a new MetricsBuilder based on the specified backend.
|
||||
// If the backend is not recognized, a discard builder is returned.
|
||||
// config can be nil if the backend is not influxdb.
|
||||
func NewMetricsBuilder(backend string, config *MetricsBackendConfig) MetricsBuilder {
|
||||
switch strings.ToLower(backend) {
|
||||
case MetricsBackendExpvar:
|
||||
return &expvarMetricsBuilder{}
|
||||
case MetricsBackendPrometheus:
|
||||
return &prometheusMetricsBuilder{}
|
||||
case MetricsBackendInfluxDB:
|
||||
return &influxMetricsBuilder{config: config}
|
||||
case MetricsBackendDiscard:
|
||||
return &discardMetricsBuilder{}
|
||||
default:
|
||||
return &discardMetricsBuilder{}
|
||||
}
|
||||
}
|
||||
|
||||
type expvarMetricsBuilder struct {
|
||||
}
|
||||
|
||||
func (b expvarMetricsBuilder) Start(ctx context.Context) error {
|
||||
// nothing needed
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b expvarMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics {
|
||||
c := expvarMetrics.NewCounter("connections")
|
||||
return &ConnectorMetrics{
|
||||
Errors: expvarMetrics.NewCounter("errors").With("subsystem", "connector"),
|
||||
BytesTransmitted: expvarMetrics.NewCounter("bytes"),
|
||||
ConnectionsFrontend: c,
|
||||
ConnectionsBackend: c,
|
||||
ActiveConnections: expvarMetrics.NewGauge("active_connections"),
|
||||
ServerActivePlayer: expvarMetrics.NewGauge("server_active_player"),
|
||||
ServerLogins: expvarMetrics.NewCounter("server_logins"),
|
||||
ServerActiveConnections: expvarMetrics.NewGauge("server_active_connections"),
|
||||
}
|
||||
}
|
||||
|
||||
type discardMetricsBuilder struct {
|
||||
}
|
||||
|
||||
func (b discardMetricsBuilder) Start(ctx context.Context) error {
|
||||
// nothing needed
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b discardMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics {
|
||||
return &ConnectorMetrics{
|
||||
Errors: discardMetrics.NewCounter(),
|
||||
BytesTransmitted: discardMetrics.NewCounter(),
|
||||
ConnectionsFrontend: discardMetrics.NewCounter(),
|
||||
ConnectionsBackend: discardMetrics.NewCounter(),
|
||||
ActiveConnections: discardMetrics.NewGauge(),
|
||||
ServerActivePlayer: discardMetrics.NewGauge(),
|
||||
ServerLogins: discardMetrics.NewCounter(),
|
||||
ServerActiveConnections: discardMetrics.NewGauge(),
|
||||
}
|
||||
}
|
||||
|
||||
type influxMetricsBuilder struct {
|
||||
config *MetricsBackendConfig
|
||||
metrics *kitinflux.Influx
|
||||
}
|
||||
|
||||
func (b *influxMetricsBuilder) Start(ctx context.Context) error {
|
||||
influxConfig := &b.config.Influxdb
|
||||
if influxConfig.Addr == "" {
|
||||
return errors.New("influx addr is required")
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(influxConfig.Interval)
|
||||
client, err := influx.NewHTTPClient(influx.HTTPConfig{
|
||||
Addr: influxConfig.Addr,
|
||||
Username: influxConfig.Username,
|
||||
Password: influxConfig.Password,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create influx http client: %w", err)
|
||||
}
|
||||
|
||||
go b.metrics.WriteLoop(ctx, ticker.C, client)
|
||||
|
||||
logrus.WithField("addr", influxConfig.Addr).
|
||||
Debug("reporting metrics to influxdb")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *influxMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics {
|
||||
influxConfig := &b.config.Influxdb
|
||||
|
||||
metrics := kitinflux.New(influxConfig.Tags, influx.BatchPointsConfig{
|
||||
Database: influxConfig.Database,
|
||||
RetentionPolicy: influxConfig.RetentionPolicy,
|
||||
}, kitlogrus.NewLogger(logrus.StandardLogger()))
|
||||
|
||||
b.metrics = metrics
|
||||
|
||||
c := metrics.NewCounter("mc_router_connections")
|
||||
return &ConnectorMetrics{
|
||||
Errors: metrics.NewCounter("mc_router_errors"),
|
||||
BytesTransmitted: metrics.NewCounter("mc_router_transmitted_bytes"),
|
||||
ConnectionsFrontend: c.With("side", "frontend"),
|
||||
ConnectionsBackend: c.With("side", "backend"),
|
||||
ActiveConnections: metrics.NewGauge("mc_router_connections_active"),
|
||||
ServerActivePlayer: metrics.NewGauge("mc_router_server_player_active"),
|
||||
ServerLogins: metrics.NewCounter("mc_router_server_logins"),
|
||||
ServerActiveConnections: metrics.NewGauge("mc_router_server_active_connections"),
|
||||
}
|
||||
}
|
||||
|
||||
type prometheusMetricsBuilder struct {
|
||||
}
|
||||
|
||||
var pcv *prometheusMetrics.Counter
|
||||
|
||||
func (b prometheusMetricsBuilder) Start(ctx context.Context) error {
|
||||
|
||||
// nothing needed
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b prometheusMetricsBuilder) BuildConnectorMetrics() *ConnectorMetrics {
|
||||
pcv = prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "mc_router",
|
||||
Name: "errors",
|
||||
Help: "The total number of errors",
|
||||
}, []string{"type"}))
|
||||
return &ConnectorMetrics{
|
||||
Errors: pcv,
|
||||
BytesTransmitted: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "mc_router",
|
||||
Name: "bytes",
|
||||
Help: "The total number of bytes transmitted",
|
||||
}, nil)),
|
||||
ConnectionsFrontend: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "mc_router",
|
||||
Subsystem: "frontend",
|
||||
Name: "connections",
|
||||
Help: "The total number of connections",
|
||||
ConstLabels: prometheus.Labels{"side": "frontend"},
|
||||
}, nil)),
|
||||
ConnectionsBackend: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "mc_router",
|
||||
Subsystem: "backend",
|
||||
Name: "connections",
|
||||
Help: "The total number of backend connections",
|
||||
ConstLabels: prometheus.Labels{"side": "backend"},
|
||||
}, []string{"host"})),
|
||||
ActiveConnections: prometheusMetrics.NewGauge(promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "mc_router",
|
||||
Name: "active_connections",
|
||||
Help: "The number of active connections",
|
||||
}, nil)),
|
||||
ServerActivePlayer: prometheusMetrics.NewGauge(promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "mc_router",
|
||||
Name: "server_active_player",
|
||||
Help: "Player is active on server",
|
||||
}, []string{"player_name", "player_uuid", "server_address"})),
|
||||
ServerLogins: prometheusMetrics.NewCounter(promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "mc_router",
|
||||
Name: "server_logins",
|
||||
Help: "The total number of player logins",
|
||||
}, []string{"player_name", "player_uuid", "server_address"})),
|
||||
ServerActiveConnections: prometheusMetrics.NewGauge(promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "mc_router",
|
||||
Name: "server_active_connections",
|
||||
Help: "The number of active connections per server",
|
||||
}, []string{"server_address"})),
|
||||
}
|
||||
}
|
||||
+5
-85
@@ -2,13 +2,10 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -18,88 +15,6 @@ var EmptyScalerFunc = func(ctx context.Context) error { return nil }
|
||||
|
||||
var tcpShieldPattern = regexp.MustCompile("///.*")
|
||||
|
||||
func init() {
|
||||
apiRoutes.Path("/routes").Methods("GET").
|
||||
Headers("Accept", "application/json").
|
||||
HandlerFunc(routesListHandler)
|
||||
apiRoutes.Path("/routes").Methods("POST").
|
||||
Headers("Content-Type", "application/json").
|
||||
HandlerFunc(routesCreateHandler)
|
||||
apiRoutes.Path("/defaultRoute").Methods("POST").
|
||||
Headers("Content-Type", "application/json").
|
||||
HandlerFunc(routesSetDefault)
|
||||
apiRoutes.Path("/routes/{serverAddress}").Methods("DELETE").HandlerFunc(routesDeleteHandler)
|
||||
}
|
||||
|
||||
func routesListHandler(writer http.ResponseWriter, _ *http.Request) {
|
||||
mappings := Routes.GetMappings()
|
||||
bytes, err := json.Marshal(mappings)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to marshal mappings")
|
||||
writer.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
_, err = writer.Write(bytes)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to write response")
|
||||
}
|
||||
}
|
||||
|
||||
func routesDeleteHandler(writer http.ResponseWriter, request *http.Request) {
|
||||
serverAddress := mux.Vars(request)["serverAddress"]
|
||||
RoutesConfig.DeleteMapping(serverAddress)
|
||||
if serverAddress != "" {
|
||||
if Routes.DeleteMapping(serverAddress) {
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
} else {
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func routesCreateHandler(writer http.ResponseWriter, request *http.Request) {
|
||||
var definition = struct {
|
||||
ServerAddress string
|
||||
Backend string
|
||||
}{}
|
||||
|
||||
//goland:noinspection GoUnhandledErrorResult
|
||||
defer request.Body.Close()
|
||||
|
||||
decoder := json.NewDecoder(request.Body)
|
||||
err := decoder.Decode(&definition)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to get request body")
|
||||
writer.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
Routes.CreateMapping(definition.ServerAddress, definition.Backend, EmptyScalerFunc, EmptyScalerFunc)
|
||||
RoutesConfig.AddMapping(definition.ServerAddress, definition.Backend)
|
||||
writer.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
func routesSetDefault(writer http.ResponseWriter, request *http.Request) {
|
||||
var body = struct {
|
||||
Backend string
|
||||
}{}
|
||||
|
||||
//goland:noinspection GoUnhandledErrorResult
|
||||
defer request.Body.Close()
|
||||
|
||||
decoder := json.NewDecoder(request.Body)
|
||||
err := decoder.Decode(&body)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Unable to parse request")
|
||||
writer.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
Routes.SetDefaultRoute(body.Backend)
|
||||
RoutesConfig.SetDefaultRoute(body.Backend)
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
type IRoutes interface {
|
||||
Reset()
|
||||
RegisterAll(mappings map[string]string)
|
||||
@@ -112,6 +27,7 @@ type IRoutes interface {
|
||||
DeleteMapping(serverAddress string) bool
|
||||
CreateMapping(serverAddress string, backend string, waker ScalerFunc, sleeper ScalerFunc)
|
||||
SetDefaultRoute(backend string)
|
||||
GetDefaultRoute() string
|
||||
SimplifySRV(srvEnabled bool)
|
||||
}
|
||||
|
||||
@@ -157,6 +73,10 @@ func (r *routesImpl) SetDefaultRoute(backend string) {
|
||||
}).Info("Using default route")
|
||||
}
|
||||
|
||||
func (r *routesImpl) GetDefaultRoute() string {
|
||||
return r.defaultRoute
|
||||
}
|
||||
|
||||
func (r *routesImpl) SimplifySRV(srvEnabled bool) {
|
||||
r.simplifySRV = srvEnabled
|
||||
}
|
||||
|
||||
@@ -1,260 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"io/fs"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type IRoutesConfig interface {
|
||||
ReadRoutesConfig(routesConfig string)
|
||||
ReloadRoutesConfig()
|
||||
AddMapping(serverAddress string, backend string)
|
||||
DeleteMapping(serverAddress string)
|
||||
SetDefaultRoute(backend string)
|
||||
WatchForChanges(ctx context.Context) error
|
||||
}
|
||||
|
||||
const debounceConfigRereadDuration = time.Second * 5
|
||||
|
||||
var RoutesConfig = &routesConfigImpl{}
|
||||
|
||||
type routesConfigImpl struct {
|
||||
sync.RWMutex
|
||||
fileName string
|
||||
}
|
||||
|
||||
type routesConfigStructure struct {
|
||||
DefaultServer string `json:"default-server"`
|
||||
Mappings map[string]string `json:"mappings"`
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) ReadRoutesConfig(routesConfig string) error {
|
||||
r.fileName = routesConfig
|
||||
|
||||
logrus.WithField("routesConfig", r.fileName).Info("Loading routes config file")
|
||||
|
||||
config, readErr := r.readRoutesConfigFile()
|
||||
|
||||
if readErr != nil {
|
||||
if errors.Is(readErr, fs.ErrNotExist) {
|
||||
logrus.WithField("routesConfig", r.fileName).Info("Routes config file doses not exist, skipping reading it")
|
||||
// File doesn't exist -> ignore it
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(readErr, "Could not load the routes config file")
|
||||
}
|
||||
|
||||
Routes.RegisterAll(config.Mappings)
|
||||
Routes.SetDefaultRoute(config.DefaultServer)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) ReloadRoutesConfig() error {
|
||||
config, readErr := r.readRoutesConfigFile()
|
||||
|
||||
if readErr != nil {
|
||||
return readErr
|
||||
}
|
||||
|
||||
logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file")
|
||||
Routes.Reset()
|
||||
Routes.RegisterAll(config.Mappings)
|
||||
Routes.SetDefaultRoute(config.DefaultServer)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) WatchForChanges(ctx context.Context) error {
|
||||
if r.fileName == "" {
|
||||
return errors.New("routes config file needs to be specified first")
|
||||
}
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Could not create a watcher")
|
||||
}
|
||||
|
||||
err = watcher.Add(r.fileName)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Could not watch the routes config file")
|
||||
}
|
||||
|
||||
go func() {
|
||||
logrus.WithField("file", r.fileName).Info("Watching routes config file")
|
||||
|
||||
debounceTimerChan := make(<-chan time.Time)
|
||||
var debounceTimer *time.Timer
|
||||
|
||||
//goland:noinspection GoUnhandledErrorResult
|
||||
defer watcher.Close()
|
||||
for {
|
||||
select {
|
||||
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
logrus.Debug("Watcher events channel closed")
|
||||
return
|
||||
}
|
||||
logrus.
|
||||
WithField("file", event.Name).
|
||||
WithField("op", event.Op).
|
||||
Trace("fs event received")
|
||||
if event.Op.Has(fsnotify.Write) || event.Op.Has(fsnotify.Create) {
|
||||
if debounceTimer == nil {
|
||||
debounceTimer = time.NewTimer(debounceConfigRereadDuration)
|
||||
} else {
|
||||
debounceTimer.Reset(debounceConfigRereadDuration)
|
||||
}
|
||||
debounceTimerChan = debounceTimer.C
|
||||
logrus.WithField("delay", debounceConfigRereadDuration).Debug("Will re-read config file after delay")
|
||||
}
|
||||
|
||||
case <-debounceTimerChan:
|
||||
readErr := r.ReadRoutesConfig(r.fileName)
|
||||
if readErr != nil {
|
||||
logrus.
|
||||
WithError(readErr).
|
||||
WithField("routesConfig", r.fileName).
|
||||
Error("Could not re-read the routes config file")
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) AddMapping(serverAddress string, backend string) {
|
||||
if !r.isRoutesConfigEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
config, readErr := r.readRoutesConfigFile()
|
||||
if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) {
|
||||
logrus.WithError(readErr).Error("Could not read the routes config file")
|
||||
return
|
||||
}
|
||||
if config.Mappings == nil {
|
||||
config.Mappings = make(map[string]string)
|
||||
}
|
||||
|
||||
config.Mappings[serverAddress] = backend
|
||||
|
||||
writeErr := r.writeRoutesConfigFile(config)
|
||||
if writeErr != nil {
|
||||
logrus.WithError(writeErr).Error("Could not write to the routes config file")
|
||||
return
|
||||
}
|
||||
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"serverAddress": serverAddress,
|
||||
"backend": backend,
|
||||
}).Info("Added route to routes config")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) SetDefaultRoute(backend string) {
|
||||
if !r.isRoutesConfigEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
config, readErr := r.readRoutesConfigFile()
|
||||
if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) {
|
||||
logrus.WithError(readErr).Error("Could not read the routes config file")
|
||||
return
|
||||
}
|
||||
|
||||
config.DefaultServer = backend
|
||||
|
||||
writeErr := r.writeRoutesConfigFile(config)
|
||||
if writeErr != nil {
|
||||
logrus.WithError(writeErr).Error("Could not write to the routes config file")
|
||||
return
|
||||
}
|
||||
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"backend": backend,
|
||||
}).Info("Set default route in routes config")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) DeleteMapping(serverAddress string) {
|
||||
if !r.isRoutesConfigEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
config, readErr := r.readRoutesConfigFile()
|
||||
if readErr != nil && !errors.Is(readErr, fs.ErrNotExist) {
|
||||
logrus.WithError(readErr).Error("Could not read the routes config file")
|
||||
return
|
||||
}
|
||||
|
||||
delete(config.Mappings, serverAddress)
|
||||
|
||||
writeErr := r.writeRoutesConfigFile(config)
|
||||
if writeErr != nil {
|
||||
logrus.WithError(writeErr).Error("Could not write to the routes config file")
|
||||
return
|
||||
}
|
||||
|
||||
logrus.WithField("serverAddress", serverAddress).Info("Deleted route in routes config")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) isRoutesConfigEnabled() bool {
|
||||
return r.fileName != ""
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) readRoutesConfigFile() (routesConfigStructure, error) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
config := routesConfigStructure{
|
||||
"",
|
||||
make(map[string]string),
|
||||
}
|
||||
|
||||
file, fileErr := os.ReadFile(r.fileName)
|
||||
if fileErr != nil {
|
||||
return config, errors.Wrap(fileErr, "Could not load the routes config file")
|
||||
}
|
||||
|
||||
parseErr := json.Unmarshal(file, &config)
|
||||
if parseErr != nil {
|
||||
return config, errors.Wrap(parseErr, "Could not parse the json routes config file")
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func (r *routesConfigImpl) writeRoutesConfigFile(config routesConfigStructure) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
newFileContent, parseErr := json.Marshal(config)
|
||||
if parseErr != nil {
|
||||
return errors.Wrap(parseErr, "Could not parse the routes to json")
|
||||
}
|
||||
|
||||
fileErr := os.WriteFile(r.fileName, newFileContent, 0664)
|
||||
if fileErr != nil {
|
||||
return errors.Wrap(fileErr, "Could not write to the routes config file")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"io/fs"
|
||||
"os"
|
||||
)
|
||||
|
||||
const debounceConfigRereadDuration = time.Second * 5
|
||||
|
||||
var RoutesConfigLoader = &routesConfigLoader{}
|
||||
|
||||
type routesConfigLoader struct {
|
||||
fileName string
|
||||
}
|
||||
|
||||
// RoutesConfigSchema declares the schema of the json file that can provide routes to serve
|
||||
type RoutesConfigSchema struct {
|
||||
DefaultServer string `json:"default-server"`
|
||||
Mappings map[string]string `json:"mappings"`
|
||||
}
|
||||
|
||||
func (r *routesConfigLoader) Load(routesConfigFileName string) error {
|
||||
r.fileName = routesConfigFileName
|
||||
|
||||
logrus.WithField("routesConfigFileName", r.fileName).Info("Loading routes config file")
|
||||
|
||||
config, readErr := r.readFile()
|
||||
|
||||
if readErr != nil {
|
||||
if errors.Is(readErr, fs.ErrNotExist) {
|
||||
logrus.WithField("routesConfigFileName", r.fileName).Info("Routes config file doses not exist, skipping reading it")
|
||||
// File doesn't exist -> ignore it
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(readErr, "Could not load the routes config file")
|
||||
}
|
||||
|
||||
Routes.RegisterAll(config.Mappings)
|
||||
Routes.SetDefaultRoute(config.DefaultServer)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *routesConfigLoader) Reload() error {
|
||||
config, readErr := r.readFile()
|
||||
|
||||
if readErr != nil {
|
||||
return readErr
|
||||
}
|
||||
|
||||
logrus.WithField("routesConfig", r.fileName).Info("Re-loading routes config file")
|
||||
Routes.Reset()
|
||||
Routes.RegisterAll(config.Mappings)
|
||||
Routes.SetDefaultRoute(config.DefaultServer)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *routesConfigLoader) WatchForChanges(ctx context.Context) error {
|
||||
if r.fileName == "" {
|
||||
return errors.New("routes config file needs to be specified first")
|
||||
}
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Could not create a watcher")
|
||||
}
|
||||
|
||||
err = watcher.Add(r.fileName)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Could not watch the routes config file")
|
||||
}
|
||||
|
||||
go func() {
|
||||
logrus.WithField("file", r.fileName).Info("Watching routes config file")
|
||||
|
||||
debounceTimerChan := make(<-chan time.Time)
|
||||
var debounceTimer *time.Timer
|
||||
|
||||
//goland:noinspection GoUnhandledErrorResult
|
||||
defer watcher.Close()
|
||||
for {
|
||||
select {
|
||||
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
logrus.Debug("Watcher events channel closed")
|
||||
return
|
||||
}
|
||||
logrus.
|
||||
WithField("file", event.Name).
|
||||
WithField("op", event.Op).
|
||||
Trace("fs event received")
|
||||
if event.Op.Has(fsnotify.Write) || event.Op.Has(fsnotify.Create) {
|
||||
if debounceTimer == nil {
|
||||
debounceTimer = time.NewTimer(debounceConfigRereadDuration)
|
||||
} else {
|
||||
debounceTimer.Reset(debounceConfigRereadDuration)
|
||||
}
|
||||
debounceTimerChan = debounceTimer.C
|
||||
logrus.WithField("delay", debounceConfigRereadDuration).Debug("Will re-read config file after delay")
|
||||
}
|
||||
|
||||
case <-debounceTimerChan:
|
||||
readErr := r.Load(r.fileName)
|
||||
if readErr != nil {
|
||||
logrus.
|
||||
WithError(readErr).
|
||||
WithField("routesConfig", r.fileName).
|
||||
Error("Could not re-read the routes config file")
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *routesConfigLoader) SaveRoutes() {
|
||||
if !r.isEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
err := r.writeFile(&RoutesConfigSchema{
|
||||
DefaultServer: Routes.GetDefaultRoute(),
|
||||
Mappings: Routes.GetMappings(),
|
||||
})
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Could not save the routes config file")
|
||||
return
|
||||
}
|
||||
logrus.Info("Saved routes config")
|
||||
}
|
||||
|
||||
func (r *routesConfigLoader) isEnabled() bool {
|
||||
return r.fileName != ""
|
||||
}
|
||||
|
||||
func (r *routesConfigLoader) readFile() (*RoutesConfigSchema, error) {
|
||||
var config RoutesConfigSchema
|
||||
|
||||
content, err := os.ReadFile(r.fileName)
|
||||
if err != nil {
|
||||
return &config, errors.Wrap(err, "Could not load the routes config file")
|
||||
}
|
||||
|
||||
parseErr := json.Unmarshal(content, &config)
|
||||
if parseErr != nil {
|
||||
return &config, errors.Wrap(parseErr, "Could not parse the json routes config file")
|
||||
}
|
||||
|
||||
return &config, nil
|
||||
}
|
||||
|
||||
func (r *routesConfigLoader) writeFile(config *RoutesConfigSchema) error {
|
||||
newFileContent, err := json.Marshal(config)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Could not parse the routes to json")
|
||||
}
|
||||
|
||||
err = os.WriteFile(r.fileName, newFileContent, 0664)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Could not write to the routes config file")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user