e2ce0453fa
Adds a separate HTTP server (not the upstream API on :25590) for the operator dashboard. Single-page UI with two panes: * routes table — current pg-synced mappings, polled every 2s * logs — SSE stream backed by a logrus hook + 500-entry ring buffer Opt-in via AUTOMC_UI_BINDING (e.g. ":8082"); unset = no-op, behaves exactly like upstream. Designed to live behind server-manager's /infra/mc-router/* reverse-proxy. Patch is internal/automc-only, same fork philosophy as the rest — upstream files stay verbatim. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
117 lines
2.3 KiB
Go
117 lines
2.3 KiB
Go
package automc
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// LogEntry is the structured shape pushed to UI subscribers via SSE.
|
|
type LogEntry struct {
|
|
Time time.Time `json:"time"`
|
|
Level string `json:"level"`
|
|
Msg string `json:"msg"`
|
|
Attrs string `json:"attrs,omitempty"`
|
|
}
|
|
|
|
// LogBus is a fan-out buffer for logrus entries: a ring of the last N
|
|
// entries (replayed on connect) + live broadcast to current subscribers.
|
|
// Identical model to the one in svc-proxy/internal/httpsrv — kept local to
|
|
// avoid a cross-repo dep on the fork.
|
|
type LogBus struct {
|
|
cap int
|
|
|
|
mu sync.RWMutex
|
|
ring []LogEntry
|
|
next int
|
|
full bool
|
|
listeners map[chan LogEntry]struct{}
|
|
}
|
|
|
|
func NewLogBus(capacity int) *LogBus {
|
|
if capacity <= 0 {
|
|
capacity = 500
|
|
}
|
|
return &LogBus{
|
|
cap: capacity,
|
|
ring: make([]LogEntry, capacity),
|
|
listeners: map[chan LogEntry]struct{}{},
|
|
}
|
|
}
|
|
|
|
func (b *LogBus) Push(e LogEntry) {
|
|
b.mu.Lock()
|
|
b.ring[b.next] = e
|
|
b.next = (b.next + 1) % b.cap
|
|
if b.next == 0 {
|
|
b.full = true
|
|
}
|
|
subs := make([]chan LogEntry, 0, len(b.listeners))
|
|
for ch := range b.listeners {
|
|
subs = append(subs, ch)
|
|
}
|
|
b.mu.Unlock()
|
|
for _, ch := range subs {
|
|
select {
|
|
case ch <- e:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *LogBus) Backlog() []LogEntry {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
if !b.full {
|
|
out := make([]LogEntry, b.next)
|
|
copy(out, b.ring[:b.next])
|
|
return out
|
|
}
|
|
out := make([]LogEntry, 0, b.cap)
|
|
out = append(out, b.ring[b.next:]...)
|
|
out = append(out, b.ring[:b.next]...)
|
|
return out
|
|
}
|
|
|
|
func (b *LogBus) Subscribe() chan LogEntry {
|
|
ch := make(chan LogEntry, 32)
|
|
b.mu.Lock()
|
|
b.listeners[ch] = struct{}{}
|
|
b.mu.Unlock()
|
|
return ch
|
|
}
|
|
|
|
func (b *LogBus) Unsubscribe(ch chan LogEntry) {
|
|
b.mu.Lock()
|
|
delete(b.listeners, ch)
|
|
b.mu.Unlock()
|
|
close(ch)
|
|
}
|
|
|
|
// logrusBusHook adapts the LogBus to logrus's Hook interface. Registered
|
|
// globally from Wire() so every upstream log emission is captured.
|
|
type logrusBusHook struct{ bus *LogBus }
|
|
|
|
func (h *logrusBusHook) Levels() []logrus.Level {
|
|
return logrus.AllLevels
|
|
}
|
|
|
|
func (h *logrusBusHook) Fire(e *logrus.Entry) error {
|
|
var attrs string
|
|
for k, v := range e.Data {
|
|
if attrs != "" {
|
|
attrs += " "
|
|
}
|
|
attrs += fmt.Sprintf("%s=%v", k, v)
|
|
}
|
|
h.bus.Push(LogEntry{
|
|
Time: e.Time,
|
|
Level: e.Level.String(),
|
|
Msg: e.Message,
|
|
Attrs: attrs,
|
|
})
|
|
return nil
|
|
}
|