39d7d4586b
Pulls mc_router_active_connections + mc_router_server_active_connections from the Prometheus default registry on every /api/routes call. UI gains an Active column on the routes table and the header subtitle shows "N routes · M active conn". Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
175 lines
4.8 KiB
Go
175 lines
4.8 KiB
Go
package automc
|
|
|
|
import (
|
|
"context"
|
|
"embed"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"net/http"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/itzg/mc-router/server"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
//go:embed static/*
|
|
var staticFS embed.FS
|
|
|
|
// startUI starts a separate HTTP server on uiBinding serving the operator
|
|
// dashboard (embedded index.html), an SSE log feed, and a JSON snapshot of
|
|
// the current route table. The upstream JSON API on API_BINDING is left
|
|
// untouched so existing tooling keeps working.
|
|
func startUI(ctx context.Context, uiBinding string, bus *LogBus) {
|
|
mux := http.NewServeMux()
|
|
|
|
sub, err := fs.Sub(staticFS, "static")
|
|
if err != nil {
|
|
logrus.WithError(err).Error("automc ui: embed misconfigured")
|
|
return
|
|
}
|
|
mux.Handle("GET /", http.FileServer(http.FS(sub)))
|
|
mux.HandleFunc("GET /api/routes", routesSnapshotHandler)
|
|
mux.HandleFunc("GET /api/logs", sseLogsHandler(bus))
|
|
|
|
srv := &http.Server{
|
|
Addr: uiBinding,
|
|
Handler: mux,
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
}
|
|
go func() {
|
|
<-ctx.Done()
|
|
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_ = srv.Shutdown(shutCtx)
|
|
}()
|
|
go func() {
|
|
logrus.WithField("binding", uiBinding).Info("automc: ui server listening")
|
|
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
logrus.WithError(err).Error("automc ui server failed")
|
|
}
|
|
}()
|
|
}
|
|
|
|
// RouteSnapshot is one row of the routes table the UI renders. Same shape as
|
|
// the upstream /routes JSON but flatter — the UI doesn't need both backend
|
|
// and scalingTarget shown separately. ActiveConnections is the live gauge
|
|
// value scraped from the Prometheus registry (-1 if the metric isn't
|
|
// registered, which can happen briefly at startup).
|
|
type RouteSnapshot struct {
|
|
ServerAddress string `json:"server_address"`
|
|
Backend string `json:"backend"`
|
|
ActiveConnections int `json:"active_connections"`
|
|
}
|
|
|
|
func routesSnapshotHandler(w http.ResponseWriter, _ *http.Request) {
|
|
mappings := server.Routes.GetMappings()
|
|
perRoute, total := scrapeActiveConnections()
|
|
|
|
out := make([]RouteSnapshot, 0, len(mappings))
|
|
for addr, backend := range mappings {
|
|
conns, ok := perRoute[addr]
|
|
if !ok {
|
|
conns = 0
|
|
}
|
|
out = append(out, RouteSnapshot{ServerAddress: addr, Backend: backend, ActiveConnections: conns})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool { return out[i].ServerAddress < out[j].ServerAddress })
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
|
"routes": out,
|
|
"total_connections": total,
|
|
"at": time.Now(),
|
|
})
|
|
}
|
|
|
|
// scrapeActiveConnections walks the Prometheus default registry to extract
|
|
// two gauges that mc-router exposes:
|
|
//
|
|
// mc_router_active_connections (no labels)
|
|
// mc_router_server_active_connections{server_address=…} (per route)
|
|
//
|
|
// Returns (perServerAddress, total). On any gathering error returns zero
|
|
// values silently — the UI shows 0 rather than blocking on a metrics issue.
|
|
func scrapeActiveConnections() (map[string]int, int) {
|
|
per := map[string]int{}
|
|
total := 0
|
|
families, err := prometheus.DefaultGatherer.Gather()
|
|
if err != nil {
|
|
return per, 0
|
|
}
|
|
for _, mf := range families {
|
|
switch mf.GetName() {
|
|
case "mc_router_active_connections":
|
|
for _, m := range mf.GetMetric() {
|
|
if g := m.GetGauge(); g != nil {
|
|
total = int(g.GetValue())
|
|
}
|
|
}
|
|
case "mc_router_server_active_connections":
|
|
for _, m := range mf.GetMetric() {
|
|
var addr string
|
|
for _, lp := range m.GetLabel() {
|
|
if lp.GetName() == "server_address" {
|
|
addr = lp.GetValue()
|
|
break
|
|
}
|
|
}
|
|
if addr == "" {
|
|
continue
|
|
}
|
|
if g := m.GetGauge(); g != nil {
|
|
per[addr] = int(g.GetValue())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return per, total
|
|
}
|
|
|
|
func sseLogsHandler(bus *LogBus) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no")
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
for _, e := range bus.Backlog() {
|
|
writeSSEEvent(w, e)
|
|
}
|
|
flusher.Flush()
|
|
|
|
ch := bus.Subscribe()
|
|
defer bus.Unsubscribe(ch)
|
|
|
|
hb := time.NewTicker(30 * time.Second)
|
|
defer hb.Stop()
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
return
|
|
case e := <-ch:
|
|
writeSSEEvent(w, e)
|
|
flusher.Flush()
|
|
case <-hb.C:
|
|
_, _ = io.WriteString(w, ":hb\n\n")
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeSSEEvent(w io.Writer, e LogEntry) {
|
|
fmt.Fprintf(w, "data: {\"time\":%q,\"level\":%q,\"msg\":%q,\"attrs\":%q}\n\n",
|
|
e.Time.Format(time.RFC3339Nano), e.Level, e.Msg, e.Attrs)
|
|
}
|