Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cf3ed60bce | |||
| 39d7d4586b | |||
| b995e012be | |||
| e2ce0453fa | |||
| 004ee6dbfd | |||
| 7884fb1c5f | |||
| 657fca325e |
@@ -0,0 +1,54 @@
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [automc]
|
||||
tags: ["automc-v*"]
|
||||
pull_request:
|
||||
branches: [automc]
|
||||
|
||||
jobs:
|
||||
validate:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 15
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version-file: go.mod
|
||||
- name: Build
|
||||
run: go build ./...
|
||||
- name: Vet
|
||||
run: go vet ./...
|
||||
- name: Test automc package
|
||||
run: go test ./internal/automc/...
|
||||
- name: Test full suite
|
||||
run: go test ./...
|
||||
|
||||
docker:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 15
|
||||
needs: validate
|
||||
if: github.event_name == 'push'
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Compute tags
|
||||
id: meta
|
||||
run: |
|
||||
if [[ "$GITEA_REF" == refs/tags/automc-v* ]]; then
|
||||
echo "tag=${GITEA_REF#refs/tags/}" >> "$GITEA_OUTPUT"
|
||||
else
|
||||
echo "tag=automc" >> "$GITEA_OUTPUT"
|
||||
fi
|
||||
echo "sha=sha-$(echo "$GITEA_SHA" | cut -c1-7)" >> "$GITEA_OUTPUT"
|
||||
|
||||
- name: Login to registry
|
||||
run: echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login git.timemachine.center -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
|
||||
|
||||
- name: Build and push
|
||||
run: |
|
||||
IMG=git.timemachine.center/timemachine/${{ gitea.event.repository.name }}
|
||||
docker build -t "$IMG:${{ steps.meta.outputs.tag }}" -t "$IMG:${{ steps.meta.outputs.sha }}" .
|
||||
docker push "$IMG:${{ steps.meta.outputs.tag }}"
|
||||
docker push "$IMG:${{ steps.meta.outputs.sha }}"
|
||||
@@ -0,0 +1,51 @@
|
||||
# Fork status — Timemachine/mc-router
|
||||
|
||||
This is a **soft fork** of [`itzg/mc-router`](https://github.com/itzg/mc-router) maintained for the [automc](https://git.timemachine.center/Timemachine/automc) Minecraft management platform.
|
||||
|
||||
## Branch model
|
||||
|
||||
| Branch | Tracks | Contents |
|
||||
|---|---|---|
|
||||
| `main` | `upstream/main` (github.com/itzg/mc-router) | Verbatim mirror. **Do not commit here.** |
|
||||
| `automc` | branched from `v1.42.1` | Soft fork; carries the automc-specific patch. |
|
||||
|
||||
## Patch surface
|
||||
|
||||
Minimal — designed for low-friction rebases.
|
||||
|
||||
| Path | Type | Why |
|
||||
|---|---|---|
|
||||
| `cmd/mc-router/main.go` | edit | 1 import line + 4-line `automc.Wire(ctx)` call. Only upstream file modified. |
|
||||
| `internal/automc/` | new dir | Self-contained extension package — no upstream conflicts possible. |
|
||||
| `docs/AUTOMC.md` | new | Operator doc for the extensions. See [`docs/AUTOMC.md`](docs/AUTOMC.md). |
|
||||
| `Makefile` | edit | Appended `sync-upstream` + `automc-build` targets. |
|
||||
| `go.mod` / `go.sum` | edit | Added `github.com/jackc/pgx/v5 v5.8.0` for LISTEN/NOTIFY. |
|
||||
| `FORK.md` | new | This file. |
|
||||
|
||||
Everything else stays untouched. No edits to `server/`, `mcproto/`, `cmd/mc-router/` beyond the wire-call.
|
||||
|
||||
## What the fork adds
|
||||
|
||||
1. **Postgres LISTEN/NOTIFY route source** — drop-in alternative to `--routes-config`, `--api-binding`, K8s, and Docker route sources. Pulled in via `AUTOMC_DSN` env var.
|
||||
2. **HTTP waker integration** — registers a `WakerFunc` per route that POSTs to a control-plane (`AUTOMC_WAKER_URL`) and polls until `state=running`.
|
||||
|
||||
Both are opt-in. With `AUTOMC_DSN` unset, the binary behaves exactly like upstream `itzg/mc-router`.
|
||||
|
||||
See [`docs/AUTOMC.md`](docs/AUTOMC.md) for env vars, schema, troubleshooting.
|
||||
|
||||
## Sync from upstream
|
||||
|
||||
```
|
||||
make sync-upstream
|
||||
```
|
||||
|
||||
Rebases `automc` onto `upstream/main`, runs `go build ./...` and `go test ./internal/automc/...`. If `server.Routes.CreateMapping` signature changes, only [`internal/automc/pgsync.go`](internal/automc/pgsync.go) needs adjustment.
|
||||
|
||||
## Reporting issues
|
||||
|
||||
| Where it goes | What |
|
||||
|---|---|
|
||||
| [`Timemachine/mc-router`](https://git.timemachine.center/Timemachine/mc-router/issues) | automc-specific bugs (pg sync, waker, fork mechanics) |
|
||||
| [`itzg/mc-router`](https://github.com/itzg/mc-router/issues) | Upstream core bugs (routing, mcproto, REST API) |
|
||||
|
||||
If unsure, file upstream first — most reports turn out to be upstream-side.
|
||||
@@ -5,3 +5,15 @@ test:
|
||||
.PHONY: release
|
||||
release:
|
||||
curl -sL https://git.io/goreleaser | bash
|
||||
|
||||
.PHONY: sync-upstream
|
||||
sync-upstream:
|
||||
git fetch upstream
|
||||
git checkout automc
|
||||
git rebase upstream/main
|
||||
go build ./...
|
||||
go test ./internal/automc/...
|
||||
|
||||
.PHONY: automc-build
|
||||
automc-build:
|
||||
go build -o mc-router ./cmd/mc-router
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"syscall"
|
||||
|
||||
"github.com/itzg/go-flagsfiller"
|
||||
"github.com/itzg/mc-router/internal/automc"
|
||||
"github.com/itzg/mc-router/server"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -65,6 +66,10 @@ func main() {
|
||||
logrus.WithError(err).Fatal("Could not setup server")
|
||||
}
|
||||
|
||||
if err := automc.Wire(ctx); err != nil {
|
||||
logrus.WithError(err).Fatal("automc Wire failed")
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Go(s.Run)
|
||||
|
||||
|
||||
+206
@@ -0,0 +1,206 @@
|
||||
# automc extensions
|
||||
|
||||
Soft fork of `itzg/mc-router` that adds Postgres-driven route management and an HTTP waker, without touching upstream behavior by default.
|
||||
|
||||
The `internal/automc` package is opt-in via env vars: with `AUTOMC_DSN` unset, the binary behaves exactly like upstream.
|
||||
|
||||
## Environment variables
|
||||
|
||||
| Var | Required | Purpose |
|
||||
|---|---|---|
|
||||
| `AUTOMC_DSN` | yes (to enable) | Postgres DSN, e.g. `postgres://user:pass@host:5432/automc?sslmode=disable`. When unset, automc is a no-op. |
|
||||
| `AUTOMC_WAKER_URL` | no | Base URL of the waker control plane (server-manager). When set, stopped backends are auto-started on login. |
|
||||
| `AUTOMC_WAKER_TOKEN` | no | Sent as `X-API-Key` header on every waker request. |
|
||||
|
||||
Recommended companion upstream flags:
|
||||
|
||||
- `--use-asleep-motd` / `--use-loading-motd` — supplies friendly MOTD to clients during the wake window. Already implemented upstream; automc does not duplicate this.
|
||||
|
||||
## Postgres schema
|
||||
|
||||
Apply this once to the database referenced by `AUTOMC_DSN`. mc-router only reads; the trigger is what tells it to re-read.
|
||||
|
||||
```sql
|
||||
CREATE TABLE IF NOT EXISTS servers (
|
||||
name TEXT PRIMARY KEY,
|
||||
domain TEXT NOT NULL,
|
||||
address TEXT NOT NULL,
|
||||
state TEXT NOT NULL DEFAULT 'stopped',
|
||||
UNIQUE(domain)
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION automc_notify_routes_changed() RETURNS trigger AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify('automc_routes_changed', '');
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
DROP TRIGGER IF EXISTS automc_servers_route_notify ON servers;
|
||||
CREATE TRIGGER automc_servers_route_notify
|
||||
AFTER INSERT OR UPDATE OF domain, address, state, enabled OR DELETE ON servers
|
||||
FOR EACH ROW EXECUTE FUNCTION automc_notify_routes_changed();
|
||||
```
|
||||
|
||||
The trigger fires on every mutation to a route-relevant column. mc-router holds a persistent `LISTEN automc_routes_changed` and re-runs `SELECT name, domain, address FROM servers WHERE domain != '' AND address != ''`, diffing against its in-memory map. Adds/removes/changes call `server.Routes.CreateMapping` and `DeleteMapping` directly — no file I/O.
|
||||
|
||||
State column is not read by mc-router; it exists to drive the trigger and for the waker's own ready-check.
|
||||
|
||||
### Sync lifecycle
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Wire as automc.Wire
|
||||
participant Sync as syncer
|
||||
participant PG as Postgres
|
||||
participant R as server.Routes
|
||||
|
||||
Wire->>Sync: newSyncer(dsn, waker)
|
||||
Wire-)Sync: go run(ctx)
|
||||
Note over Sync: backoff = 1s
|
||||
|
||||
loop reconnect (until ctx cancelled)
|
||||
Sync->>PG: pgx.Connect(dsn)
|
||||
alt connect ok
|
||||
Sync->>PG: LISTEN automc_routes_changed
|
||||
Sync->>PG: SELECT name, domain, address FROM servers
|
||||
PG-->>Sync: initial rows
|
||||
Sync->>Sync: diff vs current (empty on first run)
|
||||
loop for each add
|
||||
Sync->>R: CreateMapping(host, addr, waker)
|
||||
end
|
||||
loop for each del
|
||||
Sync->>R: DeleteMapping(host)
|
||||
end
|
||||
Note over Sync: backoff reset
|
||||
|
||||
loop while connection healthy
|
||||
Sync->>PG: WaitForNotification(ctx)
|
||||
PG-->>Sync: NOTIFY automc_routes_changed
|
||||
Sync->>PG: SELECT all servers
|
||||
PG-->>Sync: fresh rows
|
||||
Sync->>R: CreateMapping / DeleteMapping (diff only)
|
||||
end
|
||||
else connect or notify error
|
||||
Note over Sync: warn log<br/>sleep backoff<br/>backoff = min(backoff*2, 30s)
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
## Waker contract
|
||||
|
||||
When `AUTOMC_WAKER_URL` is set, every route is registered with a `WakerFunc` that the upstream connector calls only when a client tries to LOGIN (not on status pings — those are answered locally via `--use-asleep-motd`).
|
||||
|
||||
The waker:
|
||||
|
||||
1. `POST {AUTOMC_WAKER_URL}/servers/{name}/start` — fire-and-forget start signal. `409 Conflict` is treated as success (already starting).
|
||||
2. Polls `GET {AUTOMC_WAKER_URL}/servers/{name}` every 2 s, expecting JSON `{"state":"running","address":"host:port"}`.
|
||||
3. Returns the polled `address` once `state == "running"`, or errors after 90 s.
|
||||
|
||||
The polled address overrides the route's static address for that connection only — useful when the backend's IP is allocated lazily.
|
||||
|
||||
### Waker dispatch on stopped backend
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
actor Client as MC Client
|
||||
participant MR as mc-router
|
||||
participant SM as server-manager (waker URL)
|
||||
participant MC as MC backend
|
||||
|
||||
Client->>MR: TCP + Handshake (next_state=login)
|
||||
Note over MR: Routes.FindBackendForServerAddress<br/>returns backend + WakerFunc
|
||||
MR->>MC: dial backend address
|
||||
MC--xMR: connection refused
|
||||
Note over MR: WakerFunc != nil →<br/>invoke it before kicking client
|
||||
|
||||
MR->>SM: POST /servers/test1/start<br/>X-API-Key: ...
|
||||
alt 202 Accepted
|
||||
SM-->>MR: 202
|
||||
else 409 Conflict (already starting)
|
||||
SM-->>MR: 409 — treat as success
|
||||
else 5xx
|
||||
SM-->>MR: error → WakerFunc returns err
|
||||
MR->>Client: kick (connection refused)
|
||||
end
|
||||
|
||||
loop every 2s, up to 90s
|
||||
MR->>SM: GET /servers/test1
|
||||
alt running
|
||||
SM-->>MR: 200 state=running address=10.0.0.5:25565
|
||||
else still starting
|
||||
SM-->>MR: 200 state=starting
|
||||
Note over MR: continue polling
|
||||
end
|
||||
end
|
||||
|
||||
Note over MR: WakerFunc returns address
|
||||
MR->>MC: dial polled address
|
||||
MC-->>MR: ok
|
||||
MR->>Client: splice handshake-and-onward
|
||||
```
|
||||
|
||||
## Upstream sync
|
||||
|
||||
```
|
||||
make sync-upstream
|
||||
```
|
||||
|
||||
Fetches `upstream/main`, rebases the `automc` branch onto it, builds, and runs the automc tests. The patch surface is intentionally tiny so rebase conflicts are rare:
|
||||
|
||||
```
|
||||
cmd/mc-router/main.go — 1 import line + 4-line Wire call
|
||||
internal/automc/ — new directory; no upstream conflicts possible
|
||||
docs/AUTOMC.md — new doc; no upstream conflicts
|
||||
Makefile — appended targets only
|
||||
go.mod / go.sum — pgx dep added; mergeable
|
||||
```
|
||||
|
||||
If upstream renames `server.Routes.CreateMapping` or changes its signature, only `pgsync.go:apply` needs adjustment.
|
||||
|
||||
## Quick start
|
||||
|
||||
```bash
|
||||
# 1. Start a postgres reachable from mc-router
|
||||
podman run -d --name automc-pg \
|
||||
-e POSTGRES_PASSWORD=test -e POSTGRES_DB=automc \
|
||||
-p 127.0.0.1:5432:5432 docker.io/postgres:16-alpine
|
||||
|
||||
# 2. Apply the schema + trigger (see "Postgres schema" above)
|
||||
podman exec -i automc-pg psql -U postgres -d automc < schema.sql
|
||||
|
||||
# 3. Run mc-router with automc enabled
|
||||
AUTOMC_DSN="postgres://postgres:test@127.0.0.1:5432/automc?sslmode=disable" \
|
||||
AUTOMC_WAKER_URL="http://server-manager:8080" \
|
||||
AUTOMC_WAKER_TOKEN="$SM_API_KEY" \
|
||||
mc-router --port 25565 --api-binding 127.0.0.1:25590 --use-asleep-motd
|
||||
|
||||
# 4. Add a route
|
||||
podman exec automc-pg psql -U postgres -d automc -c \
|
||||
"INSERT INTO servers (name, domain, address, state) \
|
||||
VALUES ('test1', 'test1.example.com', '127.0.0.1:25001', 'running');"
|
||||
|
||||
# Log should show: automc route +: test1.example.com → 127.0.0.1:25001 (test1)
|
||||
# REST API should show it: curl http://127.0.0.1:25590/routes
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
| Symptom | Likely cause | Check |
|
||||
|---|---|---|
|
||||
| Binary starts but no `automc: pg route sync started` log | `AUTOMC_DSN` empty or unset | `env \| grep AUTOMC_DSN` |
|
||||
| `automc pgsync disconnected; reconnecting in 1s` repeating | pg unreachable / wrong DSN | Test with `psql "$AUTOMC_DSN" -c "SELECT 1"` |
|
||||
| INSERT into `servers` doesn't fire a route | Trigger missing or not on the right columns | `\d+ servers` in psql — confirm `automc_servers_route_notify` trigger exists |
|
||||
| Routes appear in REST `/routes` but client connect times out | Backend address wrong / unreachable from mc-router | `podman exec mc-router nc -zv <address>` |
|
||||
| WakerFunc never called for stopped backends | `AUTOMC_WAKER_URL` empty | Set it; without it stopped backends get connection refused on login |
|
||||
| `waker timeout for X after 1m30s` | Backend takes longer than 90 s to come up | Tune `wakerPollTimeout` (currently a const in `waker.go:18`); planned env var |
|
||||
|
||||
## Verification
|
||||
|
||||
```
|
||||
go build ./...
|
||||
go test ./internal/automc/...
|
||||
go vet ./...
|
||||
```
|
||||
|
||||
End-to-end smoke recipe verified 2026-05-27 on local podman: INSERT/UPDATE/DELETE in `servers` table propagated to `/routes` REST API within ~1 s; postgres restart triggered exponential backoff reconnect (1 s → 30 s cap) and full route re-sync on reconnect.
|
||||
@@ -8,6 +8,7 @@ require (
|
||||
github.com/gorilla/mux v1.8.1
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
|
||||
github.com/itzg/go-flagsfiller v1.17.0
|
||||
github.com/jackc/pgx/v5 v5.8.0
|
||||
github.com/juju/ratelimit v1.0.2
|
||||
github.com/pires/go-proxyproto v0.11.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
@@ -30,6 +31,9 @@ exclude google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4
|
||||
|
||||
require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/containerd/errdefs v1.0.0 // indirect
|
||||
github.com/containerd/errdefs/pkg v0.3.0 // indirect
|
||||
|
||||
@@ -88,6 +88,14 @@ github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSH
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
|
||||
github.com/itzg/go-flagsfiller v1.17.0 h1:Zkg+qsbB24Msu78l+1aqzXAIEKEeLRzAiK7DN40Fdkk=
|
||||
github.com/itzg/go-flagsfiller v1.17.0/go.mod h1:ub1t7dNqIj57TWKUtEqfopXg0xKbBgd9JVuCLmelwNo=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||
github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
|
||||
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
|
||||
@@ -163,6 +171,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
|
||||
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
@@ -263,6 +272,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
package automc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// LogEntry is the structured shape pushed to UI subscribers via SSE.
|
||||
type LogEntry struct {
|
||||
Time time.Time `json:"time"`
|
||||
Level string `json:"level"`
|
||||
Msg string `json:"msg"`
|
||||
Attrs string `json:"attrs,omitempty"`
|
||||
}
|
||||
|
||||
// LogBus is a fan-out buffer for logrus entries: a ring of the last N
|
||||
// entries (replayed on connect) + live broadcast to current subscribers.
|
||||
// Identical model to the one in svc-proxy/internal/httpsrv — kept local to
|
||||
// avoid a cross-repo dep on the fork.
|
||||
type LogBus struct {
|
||||
cap int
|
||||
|
||||
mu sync.RWMutex
|
||||
ring []LogEntry
|
||||
next int
|
||||
full bool
|
||||
listeners map[chan LogEntry]struct{}
|
||||
}
|
||||
|
||||
func NewLogBus(capacity int) *LogBus {
|
||||
if capacity <= 0 {
|
||||
capacity = 500
|
||||
}
|
||||
return &LogBus{
|
||||
cap: capacity,
|
||||
ring: make([]LogEntry, capacity),
|
||||
listeners: map[chan LogEntry]struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LogBus) Push(e LogEntry) {
|
||||
b.mu.Lock()
|
||||
b.ring[b.next] = e
|
||||
b.next = (b.next + 1) % b.cap
|
||||
if b.next == 0 {
|
||||
b.full = true
|
||||
}
|
||||
subs := make([]chan LogEntry, 0, len(b.listeners))
|
||||
for ch := range b.listeners {
|
||||
subs = append(subs, ch)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
for _, ch := range subs {
|
||||
select {
|
||||
case ch <- e:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LogBus) Backlog() []LogEntry {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
if !b.full {
|
||||
out := make([]LogEntry, b.next)
|
||||
copy(out, b.ring[:b.next])
|
||||
return out
|
||||
}
|
||||
out := make([]LogEntry, 0, b.cap)
|
||||
out = append(out, b.ring[b.next:]...)
|
||||
out = append(out, b.ring[:b.next]...)
|
||||
return out
|
||||
}
|
||||
|
||||
func (b *LogBus) Subscribe() chan LogEntry {
|
||||
ch := make(chan LogEntry, 32)
|
||||
b.mu.Lock()
|
||||
b.listeners[ch] = struct{}{}
|
||||
b.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (b *LogBus) Unsubscribe(ch chan LogEntry) {
|
||||
b.mu.Lock()
|
||||
delete(b.listeners, ch)
|
||||
b.mu.Unlock()
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// logrusBusHook adapts the LogBus to logrus's Hook interface. Registered
|
||||
// globally from Wire() so every upstream log emission is captured.
|
||||
type logrusBusHook struct{ bus *LogBus }
|
||||
|
||||
func (h *logrusBusHook) Levels() []logrus.Level {
|
||||
return logrus.AllLevels
|
||||
}
|
||||
|
||||
func (h *logrusBusHook) Fire(e *logrus.Entry) error {
|
||||
var attrs string
|
||||
for k, v := range e.Data {
|
||||
if attrs != "" {
|
||||
attrs += " "
|
||||
}
|
||||
attrs += fmt.Sprintf("%s=%v", k, v)
|
||||
}
|
||||
h.bus.Push(LogEntry{
|
||||
Time: e.Time,
|
||||
Level: e.Level.String(),
|
||||
Msg: e.Message,
|
||||
Attrs: attrs,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,133 @@
|
||||
package automc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/itzg/mc-router/server"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
notifyChannel = "automc_routes_changed"
|
||||
reconnectMin = 1 * time.Second
|
||||
reconnectMax = 30 * time.Second
|
||||
)
|
||||
|
||||
type route struct {
|
||||
name string
|
||||
domain string
|
||||
address string
|
||||
}
|
||||
|
||||
type syncer struct {
|
||||
dsn string
|
||||
waker *wakerConfig
|
||||
current map[string]route
|
||||
}
|
||||
|
||||
func newSyncer(dsn string, w *wakerConfig) *syncer {
|
||||
return &syncer{dsn: dsn, waker: w, current: map[string]route{}}
|
||||
}
|
||||
|
||||
func (s *syncer) run(ctx context.Context) {
|
||||
backoff := reconnectMin
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
err := s.connectAndLoop(ctx)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
logrus.WithError(err).Warnf("automc pgsync disconnected; reconnecting in %s", backoff)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(backoff):
|
||||
}
|
||||
backoff *= 2
|
||||
if backoff > reconnectMax {
|
||||
backoff = reconnectMax
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncer) connectAndLoop(ctx context.Context) error {
|
||||
conn, err := pgx.Connect(ctx, s.dsn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pgx connect: %w", err)
|
||||
}
|
||||
defer conn.Close(context.Background())
|
||||
|
||||
if _, err := conn.Exec(ctx, "LISTEN "+notifyChannel); err != nil {
|
||||
return fmt.Errorf("LISTEN: %w", err)
|
||||
}
|
||||
logrus.Infof("automc pgsync connected; LISTEN %s", notifyChannel)
|
||||
|
||||
if err := s.refresh(ctx, conn); err != nil {
|
||||
return fmt.Errorf("initial refresh: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
if _, err := conn.WaitForNotification(ctx); err != nil {
|
||||
return fmt.Errorf("wait notification: %w", err)
|
||||
}
|
||||
if err := s.refresh(ctx, conn); err != nil {
|
||||
return fmt.Errorf("refresh: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncer) refresh(ctx context.Context, conn *pgx.Conn) error {
|
||||
rows, err := conn.Query(ctx, `SELECT name, domain, address FROM servers WHERE domain IS NOT NULL AND domain != '' AND address != '' AND enabled IS NOT FALSE`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
desired := map[string]route{}
|
||||
for rows.Next() {
|
||||
var r route
|
||||
if err := rows.Scan(&r.name, &r.domain, &r.address); err != nil {
|
||||
return err
|
||||
}
|
||||
desired[r.domain] = r
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
add, del := diff(s.current, desired)
|
||||
s.apply(add, del)
|
||||
s.current = desired
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *syncer) apply(add []route, del []string) {
|
||||
for _, host := range del {
|
||||
if server.Routes.DeleteMapping(host) {
|
||||
logrus.Infof("automc route -: %s", host)
|
||||
}
|
||||
}
|
||||
for _, r := range add {
|
||||
server.Routes.CreateMapping(r.domain, r.address, "", s.waker.wakerFor(r.name), nil, "", "")
|
||||
logrus.Infof("automc route +: %s → %s (%s)", r.domain, r.address, r.name)
|
||||
}
|
||||
}
|
||||
|
||||
func diff(prev, next map[string]route) (add []route, del []string) {
|
||||
for host, r := range next {
|
||||
if p, ok := prev[host]; !ok || p.address != r.address || p.name != r.name {
|
||||
add = append(add, r)
|
||||
}
|
||||
}
|
||||
for host := range prev {
|
||||
if _, ok := next[host]; !ok {
|
||||
del = append(del, host)
|
||||
}
|
||||
}
|
||||
return add, del
|
||||
}
|
||||
@@ -0,0 +1,103 @@
|
||||
package automc
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDiff(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
prev, next map[string]route
|
||||
wantAddHost []string
|
||||
wantDel []string
|
||||
}{
|
||||
{
|
||||
name: "empty to empty",
|
||||
prev: map[string]route{},
|
||||
next: map[string]route{},
|
||||
wantAddHost: nil,
|
||||
wantDel: nil,
|
||||
},
|
||||
{
|
||||
name: "add one",
|
||||
prev: map[string]route{},
|
||||
next: map[string]route{"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.1:25565"}},
|
||||
wantAddHost: []string{"a.example.com"},
|
||||
wantDel: nil,
|
||||
},
|
||||
{
|
||||
name: "delete one",
|
||||
prev: map[string]route{"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.1:25565"}},
|
||||
next: map[string]route{},
|
||||
wantAddHost: nil,
|
||||
wantDel: []string{"a.example.com"},
|
||||
},
|
||||
{
|
||||
name: "address change",
|
||||
prev: map[string]route{"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.1:25565"}},
|
||||
next: map[string]route{"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.2:25565"}},
|
||||
wantAddHost: []string{"a.example.com"},
|
||||
wantDel: nil,
|
||||
},
|
||||
{
|
||||
name: "name change with same address triggers re-register (waker rebind)",
|
||||
prev: map[string]route{"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.1:25565"}},
|
||||
next: map[string]route{"a.example.com": {name: "b", domain: "a.example.com", address: "10.0.0.1:25565"}},
|
||||
wantAddHost: []string{"a.example.com"},
|
||||
wantDel: nil,
|
||||
},
|
||||
{
|
||||
name: "no change",
|
||||
prev: map[string]route{"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.1:25565"}},
|
||||
next: map[string]route{"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.1:25565"}},
|
||||
wantAddHost: nil,
|
||||
wantDel: nil,
|
||||
},
|
||||
{
|
||||
name: "mixed add + delete",
|
||||
prev: map[string]route{
|
||||
"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.1:25565"},
|
||||
"b.example.com": {name: "b", domain: "b.example.com", address: "10.0.0.2:25565"},
|
||||
},
|
||||
next: map[string]route{
|
||||
"a.example.com": {name: "a", domain: "a.example.com", address: "10.0.0.1:25565"},
|
||||
"c.example.com": {name: "c", domain: "c.example.com", address: "10.0.0.3:25565"},
|
||||
},
|
||||
wantAddHost: []string{"c.example.com"},
|
||||
wantDel: []string{"b.example.com"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
add, del := diff(tc.prev, tc.next)
|
||||
gotAdd := make([]string, 0, len(add))
|
||||
for _, r := range add {
|
||||
gotAdd = append(gotAdd, r.domain)
|
||||
}
|
||||
sort.Strings(gotAdd)
|
||||
sort.Strings(del)
|
||||
sort.Strings(tc.wantAddHost)
|
||||
sort.Strings(tc.wantDel)
|
||||
if !equalSlice(gotAdd, tc.wantAddHost) {
|
||||
t.Errorf("add: got %v want %v", gotAdd, tc.wantAddHost)
|
||||
}
|
||||
if !equalSlice(del, tc.wantDel) {
|
||||
t.Errorf("del: got %v want %v", del, tc.wantDel)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func equalSlice(a, b []string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
<!doctype html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<title>mc-router</title>
|
||||
<style>
|
||||
html, body { margin: 0; padding: 0; height: 100%; background: #0e0e0e; color: #eee;
|
||||
font-family: ui-monospace, "SF Mono", Menlo, monospace; font-size: 13px; line-height: 1.4; }
|
||||
body { display: flex; flex-direction: column; }
|
||||
header { padding: 8px 12px; background: #1a1a1a; border-bottom: 1px solid #333;
|
||||
display: flex; justify-content: space-between; align-items: center; flex: 0 0 auto; }
|
||||
header h1 { margin: 0; font-size: 14px; font-weight: 600; }
|
||||
header h1 .meta { color: #888; font-weight: normal; margin-left: 10px; }
|
||||
#status { font-size: 12px; color: #6f6; }
|
||||
#status.disconnected { color: #f66; }
|
||||
#status.connecting { color: #fc6; }
|
||||
|
||||
main { flex: 1; display: grid; grid-template-rows: 1fr 1fr; min-height: 0; }
|
||||
section { padding: 12px 16px; overflow: auto; min-height: 0; }
|
||||
section + section { border-top: 1px solid #333; }
|
||||
|
||||
h2 { margin: 0 0 8px; font-size: 12px; font-weight: 600; color: #888;
|
||||
text-transform: uppercase; letter-spacing: 0.6px;
|
||||
display: flex; justify-content: space-between; align-items: center; }
|
||||
h2 .count { color: #888; font-weight: normal; margin-left: 6px; }
|
||||
h2 button { background: transparent; color: #888; border: 1px solid #333;
|
||||
padding: 2px 8px; cursor: pointer; font: inherit; font-size: 11px; border-radius: 3px; }
|
||||
h2 button:hover { color: #eee; border-color: #6cf; }
|
||||
|
||||
table { width: 100%; border-collapse: collapse; font-variant-numeric: tabular-nums; }
|
||||
th, td { text-align: left; padding: 4px 10px; border-bottom: 1px solid #1f1f1f; }
|
||||
th { color: #888; font-weight: 500; font-size: 11px; text-transform: uppercase; letter-spacing: 0.5px; }
|
||||
td.num { text-align: right; }
|
||||
.empty { color: #888; padding: 16px 0; }
|
||||
|
||||
pre.logbox { margin: 0; white-space: pre-wrap; word-break: break-word; }
|
||||
.log-line { padding: 0; }
|
||||
.log-line .ts { color: #888; margin-right: 8px; }
|
||||
.log-line .lvl { margin-right: 6px; font-weight: 600; }
|
||||
.log-line.lvl-debug .lvl { color: #888; }
|
||||
.log-line.lvl-info .lvl { color: #6cf; }
|
||||
.log-line.lvl-warning .lvl, .log-line.lvl-warn .lvl { color: #fc6; }
|
||||
.log-line.lvl-error .lvl { color: #f66; }
|
||||
.log-line .attrs { color: #888; margin-left: 8px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<header>
|
||||
<h1>mc-router <span class="meta" id="meta">— connecting…</span></h1>
|
||||
<span id="status" class="connecting">log stream: connecting…</span>
|
||||
</header>
|
||||
<main>
|
||||
<section>
|
||||
<h2><span>Routes <span class="count" id="route-count"></span></span></h2>
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Server address (host)</th>
|
||||
<th>Backend</th>
|
||||
<th class="num">Active</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody id="route-rows"></tbody>
|
||||
</table>
|
||||
<div class="empty" id="route-empty">no routes registered</div>
|
||||
</section>
|
||||
<section>
|
||||
<h2><span>Logs</span><button onclick="document.getElementById('logbox').innerHTML=''">clear</button></h2>
|
||||
<pre class="logbox" id="logbox"></pre>
|
||||
</section>
|
||||
</main>
|
||||
|
||||
<script>
|
||||
async function refreshRoutes() {
|
||||
try {
|
||||
const r = await fetch('./api/routes');
|
||||
const j = await r.json();
|
||||
const rows = document.getElementById('route-rows');
|
||||
const empty = document.getElementById('route-empty');
|
||||
const count = document.getElementById('route-count');
|
||||
rows.innerHTML = '';
|
||||
if (!j.routes || j.routes.length === 0) {
|
||||
empty.style.display = '';
|
||||
count.textContent = '';
|
||||
} else {
|
||||
empty.style.display = 'none';
|
||||
count.textContent = '(' + j.routes.length + ')';
|
||||
for (const r of j.routes) {
|
||||
const tr = document.createElement('tr');
|
||||
tr.innerHTML = '<td>' + r.server_address + '</td>' +
|
||||
'<td>' + r.backend + '</td>' +
|
||||
'<td class="num">' + r.active_connections + '</td>';
|
||||
rows.appendChild(tr);
|
||||
}
|
||||
}
|
||||
document.getElementById('meta').textContent =
|
||||
'— ' + j.routes.length + ' routes · ' + (j.total_connections || 0) + ' active conn';
|
||||
} catch (e) {
|
||||
document.getElementById('meta').textContent = '— api error';
|
||||
}
|
||||
}
|
||||
setInterval(refreshRoutes, 2000);
|
||||
refreshRoutes();
|
||||
|
||||
function startLogStream() {
|
||||
const status = document.getElementById('status');
|
||||
const box = document.getElementById('logbox');
|
||||
const es = new EventSource('./api/logs');
|
||||
es.onopen = () => { status.textContent = 'log stream: live'; status.className = ''; };
|
||||
es.onerror = () => { status.textContent = 'log stream: reconnecting…'; status.className = 'disconnected'; };
|
||||
es.onmessage = ev => {
|
||||
let e;
|
||||
try { e = JSON.parse(ev.data); } catch { return; }
|
||||
const ts = e.time ? e.time.split('T')[1].split('.')[0] : '';
|
||||
const div = document.createElement('div');
|
||||
const lvl = (e.level || 'info').toLowerCase();
|
||||
div.className = 'log-line lvl-' + lvl;
|
||||
div.innerHTML =
|
||||
'<span class="ts">' + ts + '</span>' +
|
||||
'<span class="lvl">' + (e.level || 'info') + '</span>' +
|
||||
(e.msg || '') +
|
||||
(e.attrs ? '<span class="attrs">' + e.attrs + '</span>' : '');
|
||||
box.appendChild(div);
|
||||
const parent = box.parentElement;
|
||||
if (parent.scrollHeight - parent.scrollTop - parent.clientHeight < 60) {
|
||||
parent.scrollTop = parent.scrollHeight;
|
||||
}
|
||||
while (box.children.length > 1000) box.removeChild(box.firstChild);
|
||||
};
|
||||
}
|
||||
startLogStream();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
@@ -0,0 +1,174 @@
|
||||
package automc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/itzg/mc-router/server"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
//go:embed static/*
|
||||
var staticFS embed.FS
|
||||
|
||||
// startUI starts a separate HTTP server on uiBinding serving the operator
|
||||
// dashboard (embedded index.html), an SSE log feed, and a JSON snapshot of
|
||||
// the current route table. The upstream JSON API on API_BINDING is left
|
||||
// untouched so existing tooling keeps working.
|
||||
func startUI(ctx context.Context, uiBinding string, bus *LogBus) {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
sub, err := fs.Sub(staticFS, "static")
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("automc ui: embed misconfigured")
|
||||
return
|
||||
}
|
||||
mux.Handle("GET /", http.FileServer(http.FS(sub)))
|
||||
mux.HandleFunc("GET /api/routes", routesSnapshotHandler)
|
||||
mux.HandleFunc("GET /api/logs", sseLogsHandler(bus))
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: uiBinding,
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
_ = srv.Shutdown(shutCtx)
|
||||
}()
|
||||
go func() {
|
||||
logrus.WithField("binding", uiBinding).Info("automc: ui server listening")
|
||||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
logrus.WithError(err).Error("automc ui server failed")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// RouteSnapshot is one row of the routes table the UI renders. Same shape as
|
||||
// the upstream /routes JSON but flatter — the UI doesn't need both backend
|
||||
// and scalingTarget shown separately. ActiveConnections is the live gauge
|
||||
// value scraped from the Prometheus registry (-1 if the metric isn't
|
||||
// registered, which can happen briefly at startup).
|
||||
type RouteSnapshot struct {
|
||||
ServerAddress string `json:"server_address"`
|
||||
Backend string `json:"backend"`
|
||||
ActiveConnections int `json:"active_connections"`
|
||||
}
|
||||
|
||||
func routesSnapshotHandler(w http.ResponseWriter, _ *http.Request) {
|
||||
mappings := server.Routes.GetMappings()
|
||||
perRoute, total := scrapeActiveConnections()
|
||||
|
||||
out := make([]RouteSnapshot, 0, len(mappings))
|
||||
for addr, backend := range mappings {
|
||||
conns, ok := perRoute[addr]
|
||||
if !ok {
|
||||
conns = 0
|
||||
}
|
||||
out = append(out, RouteSnapshot{ServerAddress: addr, Backend: backend, ActiveConnections: conns})
|
||||
}
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].ServerAddress < out[j].ServerAddress })
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"routes": out,
|
||||
"total_connections": total,
|
||||
"at": time.Now(),
|
||||
})
|
||||
}
|
||||
|
||||
// scrapeActiveConnections walks the Prometheus default registry to extract
|
||||
// two gauges that mc-router exposes:
|
||||
//
|
||||
// mc_router_active_connections (no labels)
|
||||
// mc_router_server_active_connections{server_address=…} (per route)
|
||||
//
|
||||
// Returns (perServerAddress, total). On any gathering error returns zero
|
||||
// values silently — the UI shows 0 rather than blocking on a metrics issue.
|
||||
func scrapeActiveConnections() (map[string]int, int) {
|
||||
per := map[string]int{}
|
||||
total := 0
|
||||
families, err := prometheus.DefaultGatherer.Gather()
|
||||
if err != nil {
|
||||
return per, 0
|
||||
}
|
||||
for _, mf := range families {
|
||||
switch mf.GetName() {
|
||||
case "mc_router_active_connections":
|
||||
for _, m := range mf.GetMetric() {
|
||||
if g := m.GetGauge(); g != nil {
|
||||
total = int(g.GetValue())
|
||||
}
|
||||
}
|
||||
case "mc_router_server_active_connections":
|
||||
for _, m := range mf.GetMetric() {
|
||||
var addr string
|
||||
for _, lp := range m.GetLabel() {
|
||||
if lp.GetName() == "server_address" {
|
||||
addr = lp.GetValue()
|
||||
break
|
||||
}
|
||||
}
|
||||
if addr == "" {
|
||||
continue
|
||||
}
|
||||
if g := m.GetGauge(); g != nil {
|
||||
per[addr] = int(g.GetValue())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return per, total
|
||||
}
|
||||
|
||||
func sseLogsHandler(bus *LogBus) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
for _, e := range bus.Backlog() {
|
||||
writeSSEEvent(w, e)
|
||||
}
|
||||
flusher.Flush()
|
||||
|
||||
ch := bus.Subscribe()
|
||||
defer bus.Unsubscribe(ch)
|
||||
|
||||
hb := time.NewTicker(30 * time.Second)
|
||||
defer hb.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case e := <-ch:
|
||||
writeSSEEvent(w, e)
|
||||
flusher.Flush()
|
||||
case <-hb.C:
|
||||
_, _ = io.WriteString(w, ":hb\n\n")
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writeSSEEvent(w io.Writer, e LogEntry) {
|
||||
fmt.Fprintf(w, "data: {\"time\":%q,\"level\":%q,\"msg\":%q,\"attrs\":%q}\n\n",
|
||||
e.Time.Format(time.RFC3339Nano), e.Level, e.Msg, e.Attrs)
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
package automc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/itzg/mc-router/server"
|
||||
)
|
||||
|
||||
const (
|
||||
wakerPollInterval = 2 * time.Second
|
||||
wakerPollTimeout = 90 * time.Second
|
||||
)
|
||||
|
||||
var wakerPollIntervalForTest = wakerPollInterval
|
||||
|
||||
type wakerConfig struct {
|
||||
baseURL string
|
||||
token string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func newWakerConfig(baseURL, token string) *wakerConfig {
|
||||
if baseURL == "" {
|
||||
return nil
|
||||
}
|
||||
return &wakerConfig{
|
||||
baseURL: strings.TrimRight(baseURL, "/"),
|
||||
token: token,
|
||||
client: &http.Client{Timeout: 10 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wakerConfig) wakerFor(serverName string) server.WakerFunc {
|
||||
if w == nil {
|
||||
return nil
|
||||
}
|
||||
return func(ctx context.Context) (string, error) {
|
||||
if err := w.start(ctx, serverName); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return w.pollUntilRunning(ctx, serverName)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wakerConfig) start(ctx context.Context, name string) error {
|
||||
u := fmt.Sprintf("%s/servers/%s/start", w.baseURL, url.PathEscape(name))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.setAuth(req)
|
||||
resp, err := w.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("waker start %s: %w", name, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 400 && resp.StatusCode != http.StatusConflict {
|
||||
// 409 = already starting/running, treat as success
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
return fmt.Errorf("waker start %s: %s — %s", name, resp.Status, strings.TrimSpace(string(body)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *wakerConfig) pollUntilRunning(ctx context.Context, name string) (string, error) {
|
||||
deadline := time.Now().Add(wakerPollTimeout)
|
||||
ticker := time.NewTicker(wakerPollIntervalForTest)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
state, addr, err := w.queryState(ctx, name)
|
||||
if err == nil && state == "running" && addr != "" {
|
||||
return addr, nil
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return "", fmt.Errorf("waker timeout for %s after %s (last state=%q err=%v)", name, wakerPollTimeout, state, err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *wakerConfig) queryState(ctx context.Context, name string) (string, string, error) {
|
||||
u := fmt.Sprintf("%s/servers/%s", w.baseURL, url.PathEscape(name))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
w.setAuth(req)
|
||||
resp, err := w.client.Do(req)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return "", "", fmt.Errorf("query state %s: %s", name, resp.Status)
|
||||
}
|
||||
var body struct {
|
||||
State string `json:"state"`
|
||||
Address string `json:"address"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
return body.State, body.Address, nil
|
||||
}
|
||||
|
||||
func (w *wakerConfig) setAuth(req *http.Request) {
|
||||
if w.token != "" {
|
||||
req.Header.Set("X-API-Key", w.token)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
package automc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestWakerNilWhenURLEmpty(t *testing.T) {
|
||||
w := newWakerConfig("", "")
|
||||
if w != nil {
|
||||
t.Fatalf("expected nil waker config when URL empty, got %+v", w)
|
||||
}
|
||||
if w.wakerFor("foo") != nil {
|
||||
t.Fatalf("expected nil WakerFunc from nil config")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWakerStartThenPoll(t *testing.T) {
|
||||
var startCalls int32
|
||||
var pollCalls int32
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("X-API-Key") != "secret" {
|
||||
http.Error(w, "no auth", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
switch {
|
||||
case r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/start"):
|
||||
atomic.AddInt32(&startCalls, 1)
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
case r.Method == http.MethodGet:
|
||||
n := atomic.AddInt32(&pollCalls, 1)
|
||||
state := "starting"
|
||||
addr := ""
|
||||
if n >= 2 {
|
||||
state = "running"
|
||||
addr = "10.0.0.5:25565"
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(map[string]string{
|
||||
"state": state,
|
||||
"address": addr,
|
||||
})
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
wc := newWakerConfig(srv.URL, "secret")
|
||||
wc.client.Timeout = 2 * time.Second
|
||||
|
||||
// Tighten poll interval for the test only.
|
||||
saved := wakerPollIntervalForTest
|
||||
wakerPollIntervalForTest = 10 * time.Millisecond
|
||||
t.Cleanup(func() { wakerPollIntervalForTest = saved })
|
||||
|
||||
fn := wc.wakerFor("test1")
|
||||
if fn == nil {
|
||||
t.Fatal("expected non-nil WakerFunc")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
addr, err := fn(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("wake failed: %v", err)
|
||||
}
|
||||
if addr != "10.0.0.5:25565" {
|
||||
t.Errorf("addr: got %q want 10.0.0.5:25565", addr)
|
||||
}
|
||||
if atomic.LoadInt32(&startCalls) != 1 {
|
||||
t.Errorf("expected 1 start call, got %d", startCalls)
|
||||
}
|
||||
if got := atomic.LoadInt32(&pollCalls); got < 2 {
|
||||
t.Errorf("expected >=2 polls, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWakerStartHandles409(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == http.MethodPost && strings.HasSuffix(r.URL.Path, "/start") {
|
||||
http.Error(w, "already running", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
_ = json.NewEncoder(w).Encode(map[string]string{
|
||||
"state": "running",
|
||||
"address": "10.0.0.6:25565",
|
||||
})
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
wc := newWakerConfig(srv.URL, "")
|
||||
wakerPollIntervalForTest = 10 * time.Millisecond
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
addr, err := wc.wakerFor("x")(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("expected 409 to be treated as success, got err: %v", err)
|
||||
}
|
||||
if addr != "10.0.0.6:25565" {
|
||||
t.Errorf("addr: got %q", addr)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
// Package automc wires automc-specific extensions onto upstream mc-router.
|
||||
//
|
||||
// All behavior is opt-in via env vars; when AUTOMC_DSN is unset, Wire is a no-op
|
||||
// and the binary behaves exactly like upstream itzg/mc-router.
|
||||
package automc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func Wire(ctx context.Context) error {
|
||||
dsn := os.Getenv("AUTOMC_DSN")
|
||||
if dsn == "" {
|
||||
return nil
|
||||
}
|
||||
waker := newWakerConfig(os.Getenv("AUTOMC_WAKER_URL"), os.Getenv("AUTOMC_WAKER_TOKEN"))
|
||||
s := newSyncer(dsn, waker)
|
||||
go s.run(ctx)
|
||||
logrus.Info("automc: pg route sync started")
|
||||
|
||||
// Operator UI on a separate port — upstream's API_BINDING stays
|
||||
// JSON-only and untouched. Enable by setting AUTOMC_UI_BINDING (e.g.
|
||||
// ":8082"); leave unset to skip and behave exactly like upstream.
|
||||
if uiBinding := os.Getenv("AUTOMC_UI_BINDING"); uiBinding != "" {
|
||||
bus := NewLogBus(500)
|
||||
logrus.AddHook(&logrusBusHook{bus: bus})
|
||||
startUI(ctx, uiBinding, bus)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
+8
-2
@@ -659,12 +659,18 @@ func (c *Connector) findAndConnectBackend(frontendConn net.Conn,
|
||||
}
|
||||
}
|
||||
|
||||
if waker != nil && nextState == mcproto.StateStatus {
|
||||
if nextState == mcproto.StateStatus {
|
||||
// Previously gated on `waker != nil` so only auto-scale routes
|
||||
// got a predefined response. For static fleets (no waker) clients
|
||||
// just saw a closed connection on backend-down. Now any known
|
||||
// route whose backend dial fails returns the configured asleep/
|
||||
// loading MOTD — falls back to the global AUTO_SCALE_ASLEEP_MOTD
|
||||
// when no per-route override is set.
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"client": clientAddr,
|
||||
"server": serverAddress,
|
||||
"isLegacy": isLegacy,
|
||||
}).Debug("Scalable backend unreachable: serving predefined status response")
|
||||
}).Debug("Backend unreachable: serving predefined status response")
|
||||
|
||||
br := bufio.NewReader(frontendConn)
|
||||
if isLegacy {
|
||||
|
||||
Reference in New Issue
Block a user