Add connection webhook notifications (#392)
Also * Added decode of LoginStart message * Add metrics backend constants * Updated usage section * Documented MaxFrameLength
This commit is contained in:
+102
-15
@@ -1,8 +1,11 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
@@ -33,15 +36,34 @@ type ConnectorMetrics struct {
|
||||
ActiveConnections metrics.Gauge
|
||||
}
|
||||
|
||||
func NewConnector(metrics *ConnectorMetrics, sendProxyProto bool, receiveProxyProto bool, trustedProxyNets []*net.IPNet,
|
||||
clientFilter *ClientFilter) *Connector {
|
||||
type ClientInfo struct {
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
}
|
||||
|
||||
func ClientInfoFromAddr(addr net.Addr) *ClientInfo {
|
||||
if addr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &ClientInfo{
|
||||
Host: addr.(*net.TCPAddr).IP.String(),
|
||||
Port: addr.(*net.TCPAddr).Port,
|
||||
}
|
||||
}
|
||||
|
||||
type PlayerInfo struct {
|
||||
Name string `json:"name"`
|
||||
Uuid uuid.UUID `json:"uuid"`
|
||||
}
|
||||
|
||||
func NewConnector(metrics *ConnectorMetrics, sendProxyProto bool, receiveProxyProto bool, trustedProxyNets []*net.IPNet) *Connector {
|
||||
return &Connector{
|
||||
metrics: metrics,
|
||||
sendProxyProto: sendProxyProto,
|
||||
connectionsCond: sync.NewCond(&sync.Mutex{}),
|
||||
receiveProxyProto: receiveProxyProto,
|
||||
trustedProxyNets: trustedProxyNets,
|
||||
clientFilter: clientFilter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +78,16 @@ type Connector struct {
|
||||
connectionsCond *sync.Cond
|
||||
ngrokToken string
|
||||
clientFilter *ClientFilter
|
||||
|
||||
connectionNotifier ConnectionNotifier
|
||||
}
|
||||
|
||||
func (c *Connector) SetConnectionNotifier(notifier ConnectionNotifier) {
|
||||
c.connectionNotifier = notifier
|
||||
}
|
||||
|
||||
func (c *Connector) SetClientFilter(filter *ClientFilter) {
|
||||
c.clientFilter = filter
|
||||
}
|
||||
|
||||
func (c *Connector) StartAcceptingConnections(ctx context.Context, listenAddress string, connRateLimit int) error {
|
||||
@@ -169,10 +201,12 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn)
|
||||
clientAddr := frontendConn.RemoteAddr()
|
||||
|
||||
if tcpAddr, ok := clientAddr.(*net.TCPAddr); ok {
|
||||
allow := c.clientFilter.Allow(tcpAddr.AddrPort())
|
||||
if !allow {
|
||||
logrus.WithField("client", clientAddr).Debug("Client is blocked")
|
||||
return
|
||||
if c.clientFilter != nil {
|
||||
allow := c.clientFilter.Allow(tcpAddr.AddrPort())
|
||||
if !allow {
|
||||
logrus.WithField("client", clientAddr).Debug("Client is blocked")
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logrus.WithField("client", clientAddr).Warn("Remote address is not a TCP address, skipping filtering")
|
||||
@@ -183,10 +217,12 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn)
|
||||
Info("Got connection")
|
||||
defer logrus.WithField("client", clientAddr).Debug("Closing frontend connection")
|
||||
|
||||
// Tee-off the inspected content to a buffer so that we can retransmit it to the backend connection
|
||||
inspectionBuffer := new(bytes.Buffer)
|
||||
|
||||
inspectionReader := io.TeeReader(frontendConn, inspectionBuffer)
|
||||
|
||||
bufferedReader := bufio.NewReader(inspectionReader)
|
||||
|
||||
if err := frontendConn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
|
||||
logrus.
|
||||
WithError(err).
|
||||
@@ -195,7 +231,7 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn)
|
||||
c.metrics.Errors.With("type", "read_deadline").Add(1)
|
||||
return
|
||||
}
|
||||
packet, err := mcproto.ReadPacket(inspectionReader, clientAddr, c.state)
|
||||
packet, err := mcproto.ReadPacket(bufferedReader, clientAddr, c.state)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("clientAddr", clientAddr).Error("Failed to read packet")
|
||||
c.metrics.Errors.With("type", "read").Add(1)
|
||||
@@ -209,7 +245,7 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn)
|
||||
Debug("Got packet")
|
||||
|
||||
if packet.PacketID == mcproto.PacketIdHandshake {
|
||||
handshake, err := mcproto.ReadHandshake(packet.Data)
|
||||
handshake, err := mcproto.DecodeHandshake(packet.Data)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("clientAddr", clientAddr).
|
||||
Error("Failed to read handshake")
|
||||
@@ -222,10 +258,25 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn)
|
||||
WithField("handshake", handshake).
|
||||
Debug("Got handshake")
|
||||
|
||||
serverAddress := handshake.ServerAddress
|
||||
nextState := mcproto.State(handshake.NextState)
|
||||
var userInfo *PlayerInfo = nil
|
||||
if handshake.NextState == mcproto.StateLogin {
|
||||
userInfo, err = c.readUserInfo(bufferedReader, clientAddr, handshake.NextState)
|
||||
if err != nil {
|
||||
logrus.
|
||||
WithError(err).
|
||||
WithField("clientAddr", clientAddr).
|
||||
Error("Failed to read user info")
|
||||
c.metrics.Errors.With("type", "read").Add(1)
|
||||
return
|
||||
}
|
||||
logrus.
|
||||
WithField("client", clientAddr).
|
||||
WithField("userInfo", userInfo).
|
||||
Debug("Got user info")
|
||||
}
|
||||
|
||||
c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, handshake.ServerAddress, userInfo, handshake.NextState)
|
||||
|
||||
c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, serverAddress, nextState)
|
||||
} else if packet.PacketID == mcproto.PacketIdLegacyServerListPing {
|
||||
handshake, ok := packet.Data.(*mcproto.LegacyServerListPing)
|
||||
if !ok {
|
||||
@@ -244,7 +295,7 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn)
|
||||
|
||||
serverAddress := handshake.ServerAddress
|
||||
|
||||
c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, serverAddress, mcproto.StateStatus)
|
||||
c.findAndConnectBackend(ctx, frontendConn, clientAddr, inspectionBuffer, serverAddress, nil, mcproto.StateStatus)
|
||||
} else {
|
||||
logrus.
|
||||
WithField("client", clientAddr).
|
||||
@@ -255,8 +306,28 @@ func (c *Connector) HandleConnection(ctx context.Context, frontendConn net.Conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connector) readUserInfo(bufferedReader *bufio.Reader, clientAddr net.Addr, state mcproto.State) (*PlayerInfo, error) {
|
||||
loginPacket, err := mcproto.ReadPacket(bufferedReader, clientAddr, state)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read login packet: %w", err)
|
||||
}
|
||||
|
||||
if loginPacket.PacketID == mcproto.PacketIdLogin {
|
||||
loginStart, err := mcproto.DecodeLoginStart(loginPacket.Data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode login start: %w", err)
|
||||
}
|
||||
return &PlayerInfo{
|
||||
Name: loginStart.Name,
|
||||
Uuid: loginStart.PlayerUuid,
|
||||
}, nil
|
||||
} else {
|
||||
return nil, fmt.Errorf("expected login packet, got %d", loginPacket.PacketID)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net.Conn,
|
||||
clientAddr net.Addr, preReadContent io.Reader, serverAddress string, nextState mcproto.State) {
|
||||
clientAddr net.Addr, preReadContent io.Reader, serverAddress string, userInfo *PlayerInfo, nextState mcproto.State) {
|
||||
|
||||
backendHostPort, resolvedHost, waker := Routes.FindBackendForServerAddress(ctx, serverAddress)
|
||||
if waker != nil && nextState > mcproto.StateStatus {
|
||||
@@ -273,13 +344,20 @@ func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net.
|
||||
WithField("resolvedHost", resolvedHost).
|
||||
Warn("Unable to find registered backend")
|
||||
c.metrics.Errors.With("type", "missing_backend").Add(1)
|
||||
|
||||
if c.connectionNotifier != nil {
|
||||
c.connectionNotifier.NotifyMissingBackend(ctx, clientAddr, serverAddress, userInfo)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
logrus.
|
||||
WithField("client", clientAddr).
|
||||
WithField("server", serverAddress).
|
||||
WithField("backendHostPort", backendHostPort).
|
||||
Info("Connecting to backend")
|
||||
|
||||
backendConn, err := net.Dial("tcp", backendHostPort)
|
||||
if err != nil {
|
||||
logrus.
|
||||
@@ -289,9 +367,18 @@ func (c *Connector) findAndConnectBackend(ctx context.Context, frontendConn net.
|
||||
WithField("backend", backendHostPort).
|
||||
Warn("Unable to connect to backend")
|
||||
c.metrics.Errors.With("type", "backend_failed").Add(1)
|
||||
|
||||
if c.connectionNotifier != nil {
|
||||
c.connectionNotifier.NotifyFailedBackendConnection(ctx, clientAddr, serverAddress, userInfo, backendHostPort, err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if c.connectionNotifier != nil {
|
||||
c.connectionNotifier.NotifyConnected(ctx, clientAddr, serverAddress, userInfo, backendHostPort)
|
||||
}
|
||||
|
||||
c.metrics.ConnectionsBackend.With("host", resolvedHost).Add(1)
|
||||
|
||||
c.metrics.ActiveConnections.Set(float64(
|
||||
|
||||
Reference in New Issue
Block a user