diff --git a/internal/automc/loghook.go b/internal/automc/loghook.go new file mode 100644 index 0000000..dbceec3 --- /dev/null +++ b/internal/automc/loghook.go @@ -0,0 +1,116 @@ +package automc + +import ( + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// LogEntry is the structured shape pushed to UI subscribers via SSE. +type LogEntry struct { + Time time.Time `json:"time"` + Level string `json:"level"` + Msg string `json:"msg"` + Attrs string `json:"attrs,omitempty"` +} + +// LogBus is a fan-out buffer for logrus entries: a ring of the last N +// entries (replayed on connect) + live broadcast to current subscribers. +// Identical model to the one in svc-proxy/internal/httpsrv — kept local to +// avoid a cross-repo dep on the fork. +type LogBus struct { + cap int + + mu sync.RWMutex + ring []LogEntry + next int + full bool + listeners map[chan LogEntry]struct{} +} + +func NewLogBus(capacity int) *LogBus { + if capacity <= 0 { + capacity = 500 + } + return &LogBus{ + cap: capacity, + ring: make([]LogEntry, capacity), + listeners: map[chan LogEntry]struct{}{}, + } +} + +func (b *LogBus) Push(e LogEntry) { + b.mu.Lock() + b.ring[b.next] = e + b.next = (b.next + 1) % b.cap + if b.next == 0 { + b.full = true + } + subs := make([]chan LogEntry, 0, len(b.listeners)) + for ch := range b.listeners { + subs = append(subs, ch) + } + b.mu.Unlock() + for _, ch := range subs { + select { + case ch <- e: + default: + } + } +} + +func (b *LogBus) Backlog() []LogEntry { + b.mu.RLock() + defer b.mu.RUnlock() + if !b.full { + out := make([]LogEntry, b.next) + copy(out, b.ring[:b.next]) + return out + } + out := make([]LogEntry, 0, b.cap) + out = append(out, b.ring[b.next:]...) + out = append(out, b.ring[:b.next]...) + return out +} + +func (b *LogBus) Subscribe() chan LogEntry { + ch := make(chan LogEntry, 32) + b.mu.Lock() + b.listeners[ch] = struct{}{} + b.mu.Unlock() + return ch +} + +func (b *LogBus) Unsubscribe(ch chan LogEntry) { + b.mu.Lock() + delete(b.listeners, ch) + b.mu.Unlock() + close(ch) +} + +// logrusBusHook adapts the LogBus to logrus's Hook interface. Registered +// globally from Wire() so every upstream log emission is captured. +type logrusBusHook struct{ bus *LogBus } + +func (h *logrusBusHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +func (h *logrusBusHook) Fire(e *logrus.Entry) error { + var attrs string + for k, v := range e.Data { + if attrs != "" { + attrs += " " + } + attrs += fmt.Sprintf("%s=%v", k, v) + } + h.bus.Push(LogEntry{ + Time: e.Time, + Level: e.Level.String(), + Msg: e.Message, + Attrs: attrs, + }) + return nil +} diff --git a/internal/automc/static/index.html b/internal/automc/static/index.html new file mode 100644 index 0000000..8c92702 --- /dev/null +++ b/internal/automc/static/index.html @@ -0,0 +1,150 @@ + + + + + mc-router + + + +
+

mc-router — connecting…

+ log stream: connecting +
+ +
+

Routes

+ + + + + + + + +
Server address (host)Backend
+
no routes registered
+
+ +
+

+ Logs + +

+

+  
+ + + + diff --git a/internal/automc/uipage.go b/internal/automc/uipage.go new file mode 100644 index 0000000..fc3e81e --- /dev/null +++ b/internal/automc/uipage.go @@ -0,0 +1,119 @@ +package automc + +import ( + "context" + "embed" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "sort" + "time" + + "github.com/itzg/mc-router/server" + "github.com/sirupsen/logrus" +) + +//go:embed static/* +var staticFS embed.FS + +// startUI starts a separate HTTP server on uiBinding serving the operator +// dashboard (embedded index.html), an SSE log feed, and a JSON snapshot of +// the current route table. The upstream JSON API on API_BINDING is left +// untouched so existing tooling keeps working. +func startUI(ctx context.Context, uiBinding string, bus *LogBus) { + mux := http.NewServeMux() + + sub, err := fs.Sub(staticFS, "static") + if err != nil { + logrus.WithError(err).Error("automc ui: embed misconfigured") + return + } + mux.Handle("GET /", http.FileServer(http.FS(sub))) + mux.HandleFunc("GET /api/routes", routesSnapshotHandler) + mux.HandleFunc("GET /api/logs", sseLogsHandler(bus)) + + srv := &http.Server{ + Addr: uiBinding, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + go func() { + <-ctx.Done() + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(shutCtx) + }() + go func() { + logrus.WithField("binding", uiBinding).Info("automc: ui server listening") + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logrus.WithError(err).Error("automc ui server failed") + } + }() +} + +// RouteSnapshot is one row of the routes table the UI renders. Same shape as +// the upstream /routes JSON but flatter — the UI doesn't need both backend +// and scalingTarget shown separately. +type RouteSnapshot struct { + ServerAddress string `json:"server_address"` + Backend string `json:"backend"` +} + +func routesSnapshotHandler(w http.ResponseWriter, _ *http.Request) { + mappings := server.Routes.GetMappings() + out := make([]RouteSnapshot, 0, len(mappings)) + for addr, backend := range mappings { + out = append(out, RouteSnapshot{ServerAddress: addr, Backend: backend}) + } + sort.Slice(out, func(i, j int) bool { return out[i].ServerAddress < out[j].ServerAddress }) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "routes": out, + "at": time.Now(), + }) +} + +func sseLogsHandler(bus *LogBus) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + for _, e := range bus.Backlog() { + writeSSEEvent(w, e) + } + flusher.Flush() + + ch := bus.Subscribe() + defer bus.Unsubscribe(ch) + + hb := time.NewTicker(30 * time.Second) + defer hb.Stop() + for { + select { + case <-r.Context().Done(): + return + case e := <-ch: + writeSSEEvent(w, e) + flusher.Flush() + case <-hb.C: + _, _ = io.WriteString(w, ":hb\n\n") + flusher.Flush() + } + } + } +} + +func writeSSEEvent(w io.Writer, e LogEntry) { + fmt.Fprintf(w, "data: {\"time\":%q,\"level\":%q,\"msg\":%q,\"attrs\":%q}\n\n", + e.Time.Format(time.RFC3339Nano), e.Level, e.Msg, e.Attrs) +} diff --git a/internal/automc/wire.go b/internal/automc/wire.go index 82de371..b4ddee9 100644 --- a/internal/automc/wire.go +++ b/internal/automc/wire.go @@ -20,5 +20,14 @@ func Wire(ctx context.Context) error { s := newSyncer(dsn, waker) go s.run(ctx) logrus.Info("automc: pg route sync started") + + // Operator UI on a separate port — upstream's API_BINDING stays + // JSON-only and untouched. Enable by setting AUTOMC_UI_BINDING (e.g. + // ":8082"); leave unset to skip and behave exactly like upstream. + if uiBinding := os.Getenv("AUTOMC_UI_BINDING"); uiBinding != "" { + bus := NewLogBus(500) + logrus.AddHook(&logrusBusHook{bus: bus}) + startUI(ctx, uiBinding, bus) + } return nil }