package server import ( "context" "fmt" "net" "strconv" "strings" "sync" "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) const ( AnnotationExternalServerName = "mc-router.itzg.me/externalServerName" AnnotationDefaultServer = "mc-router.itzg.me/defaultServer" AnnotationAutoScaleUp = "mc-router.itzg.me/autoScaleUp" AnnotationAutoScaleDown = "mc-router.itzg.me/autoScaleDown" AnnotationProxyServerName = "mc-router.itzg.me/proxyServerName" AnnotationAutoScaleAsleepMOTD = "mc-router.itzg.me/autoScaleAsleepMOTD" AnnotationAutoScaleLoadingMOTD = "mc-router.itzg.me/autoScaleLoadingMOTD" AnnotationAutoScaleWaitTimeout = "mc-router.itzg.me/autoScaleWaitTimeout" ) // K8sWatcher is a RouteFinder that can find routes from kubernetes services. // It also watches for stateful sets to auto scale up/down, if enabled. type K8sWatcher struct { sync.RWMutex config *rest.Config autoScaleUp bool autoScaleDown bool namespace string // The key in mappings is a Service, and the value the StatefulSet name mappings map[string]string routesHandler RoutesHandler clientset *kubernetes.Clientset } func NewK8sWatcherInCluster() (*K8sWatcher, error) { config, err := rest.InClusterConfig() if err != nil { return nil, errors.Wrap(err, "Unable to load in-cluster config") } return &K8sWatcher{ config: config, }, nil } func NewK8sWatcherWithConfig(kubeConfigFile string) (*K8sWatcher, error) { config, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile) if err != nil { return nil, errors.Wrap(err, "Could not load kube config file") } return &K8sWatcher{ config: config, }, nil } func (w *K8sWatcher) WithAutoScale(autoScaleUp bool, autoScaleDown bool) *K8sWatcher { w.autoScaleUp = autoScaleUp w.autoScaleDown = autoScaleDown return w } func (w *K8sWatcher) WithNamespace(namespace string) *K8sWatcher { w.namespace = namespace return w } func (w *K8sWatcher) String() string { return "k8s" } func (w *K8sWatcher) Start(ctx context.Context, handler RoutesHandler) error { w.routesHandler = handler clientset, err := kubernetes.NewForConfig(w.config) if err != nil { return errors.Wrap(err, "Could not create kube clientset") } w.clientset = clientset _, serviceController := cache.NewInformerWithOptions(cache.InformerOptions{ ListerWatcher: cache.NewListWatchFromClient( clientset.CoreV1().RESTClient(), string(core.ResourceServices), w.namespace, fields.Everything(), ), ObjectType: &core.Service{}, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: w.handleAdd, DeleteFunc: w.handleDelete, UpdateFunc: w.handleUpdate, }, }) go serviceController.RunWithContext(ctx) w.mappings = make(map[string]string) if w.autoScaleUp || w.autoScaleDown { _, statefulSetController := cache.NewInformerWithOptions(cache.InformerOptions{ ListerWatcher: cache.NewListWatchFromClient( clientset.AppsV1().RESTClient(), "statefulSets", w.namespace, fields.Everything(), ), ObjectType: &apps.StatefulSet{}, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: w.handleAddStatefulSet(), DeleteFunc: w.handleDeleteStatefulSet(), UpdateFunc: w.handleUpdateStatefulSet(), }, }) go statefulSetController.RunWithContext(ctx) } logrus.Info("Monitoring Kubernetes for Minecraft services") return nil } func (w *K8sWatcher) handleAddStatefulSet() func(obj interface{}) { return func(obj interface{}) { statefulSet, ok := obj.(*apps.StatefulSet) if !ok { return } w.RLock() defer w.RUnlock() w.mappings[statefulSet.Spec.ServiceName] = statefulSet.Name } } func (w *K8sWatcher) handleUpdateStatefulSet() func(oldObj interface{}, newObj interface{}) { return func(oldObj, newObj interface{}) { oldStatefulSet, ok := oldObj.(*apps.StatefulSet) if !ok { return } newStatefulSet, ok := newObj.(*apps.StatefulSet) if !ok { return } w.RLock() defer w.RUnlock() delete(w.mappings, oldStatefulSet.Spec.ServiceName) w.mappings[newStatefulSet.Spec.ServiceName] = newStatefulSet.Name } } func (w *K8sWatcher) handleDeleteStatefulSet() func(obj interface{}) { return func(obj interface{}) { statefulSet, ok := obj.(*apps.StatefulSet) if !ok { return } w.RLock() defer w.RUnlock() delete(w.mappings, statefulSet.Spec.ServiceName) } } // oldObj and newObj are expected to be *v1.Service func (w *K8sWatcher) handleUpdate(oldObj interface{}, newObj interface{}) { for _, oldRoutableService := range w.extractRoutableServices(oldObj) { logrus.WithFields(logrus.Fields{ "old": oldRoutableService, }).Debug("UPDATE") if oldRoutableService.externalServiceName != "" { w.routesHandler.DeleteMapping(oldRoutableService.externalServiceName) } } for _, newRoutableService := range w.extractRoutableServices(newObj) { logrus.WithFields(logrus.Fields{ "new": newRoutableService, }).Debug("UPDATE") if newRoutableService.externalServiceName != "" { w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, newRoutableService.autoScaleAsleepMOTD, newRoutableService.autoScaleLoadingMOTD) } else { w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, newRoutableService.autoScaleAsleepMOTD, newRoutableService.autoScaleLoadingMOTD) } } } // obj is expected to be a *v1.Service func (w *K8sWatcher) handleDelete(obj interface{}) { routableServices := w.extractRoutableServices(obj) for _, routableService := range routableServices { if routableService != nil { logrus.WithField("routableService", routableService).Debug("DELETE") if routableService.externalServiceName != "" { w.routesHandler.DeleteMapping(routableService.externalServiceName) } else { w.routesHandler.SetDefaultRoute("", "", nil, nil, "", "") } } } } // obj is expected to be a *v1.Service func (w *K8sWatcher) handleAdd(obj interface{}) { routableServices := w.extractRoutableServices(obj) for _, routableService := range routableServices { if routableService != nil { logrus.WithField("routableService", routableService).Debug("ADD") if routableService.externalServiceName != "" { w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, routableService.autoScaleAsleepMOTD, routableService.autoScaleLoadingMOTD) } else { w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, routableService.autoScaleAsleepMOTD, routableService.autoScaleLoadingMOTD) } } } } type routableService struct { externalServiceName string containerEndpoint string scalingTarget string autoScaleUp WakerFunc autoScaleDown SleeperFunc autoScaleAsleepMOTD string autoScaleLoadingMOTD string } // obj is expected to be a *v1.Service func (w *K8sWatcher) extractRoutableServices(obj interface{}) []*routableService { service, ok := obj.(*core.Service) if !ok { return nil } routableServices := make([]*routableService, 0) if externalServiceName, exists := service.Annotations[AnnotationExternalServerName]; exists { serviceNames := SplitExternalHosts(externalServiceName) for _, serviceName := range serviceNames { routableServices = append(routableServices, w.buildDetails(service, serviceName)) } return routableServices } else if _, exists := service.Annotations[AnnotationDefaultServer]; exists { return []*routableService{w.buildDetails(service, "")} } return nil } func (w *K8sWatcher) buildDetails(service *core.Service, externalServiceName string) *routableService { clusterIp := service.Spec.ClusterIP if service.Spec.Type == core.ServiceTypeExternalName { clusterIp = service.Spec.ExternalName } mcRouterPort := "" mcPort := "" for _, p := range service.Spec.Ports { if p.Name == "mc-router" { mcRouterPort = strconv.Itoa(int(p.Port)) } if p.Name == "minecraft" { mcPort = strconv.Itoa(int(p.Port)) } } port := "25565" if len(mcRouterPort) > 0 { port = mcRouterPort } else if len(mcPort) > 0 { port = mcPort } endpoint := net.JoinHostPort(clusterIp, port) routingEndpoint := endpoint scalingTarget := endpoint // Default to service endpoint for scaling if proxyServerName, exists := service.Annotations[AnnotationProxyServerName]; exists && proxyServerName != "" { // Ensure the proxy address has a port if _, _, err := net.SplitHostPort(proxyServerName); err != nil { proxyServerName = net.JoinHostPort(proxyServerName, "25565") } routingEndpoint = proxyServerName // scalingTarget remains the service endpoint (already set above) } autoScaleAsleepMOTD := "" autoScaleLoadingMOTD := "" waitTimeout := 60 * time.Second if asleepMOTD, exists := service.Annotations[AnnotationAutoScaleAsleepMOTD]; exists && asleepMOTD != "" { autoScaleAsleepMOTD = asleepMOTD } if loadingMOTD, exists := service.Annotations[AnnotationAutoScaleLoadingMOTD]; exists && loadingMOTD != "" { autoScaleLoadingMOTD = loadingMOTD } if timeoutStr, exists := service.Annotations[AnnotationAutoScaleWaitTimeout]; exists && timeoutStr != "" { if parsedTimeout, err := time.ParseDuration(timeoutStr); err == nil && parsedTimeout > 0 { waitTimeout = parsedTimeout } else { logrus.WithError(err).WithField("annotation", AnnotationAutoScaleWaitTimeout).WithField("value", timeoutStr).Warn("Invalid wait timeout annotation, falling back to default 60s") } } wakerFunc := w.buildScaleFunction(service, 0, 1) rs := &routableService{ externalServiceName: externalServiceName, containerEndpoint: routingEndpoint, scalingTarget: scalingTarget, autoScaleUp: buildK8sWaker(routingEndpoint, wakerFunc, waitTimeout), autoScaleDown: w.buildScaleFunction(service, 1, 0), autoScaleAsleepMOTD: autoScaleAsleepMOTD, autoScaleLoadingMOTD: autoScaleLoadingMOTD, } return rs } func buildK8sWaker(endpoint string, scaleUp SleeperFunc, waitTimeout time.Duration) WakerFunc { if scaleUp == nil { return nil } return func(ctx context.Context) (string, error) { if err := scaleUp(ctx); err != nil { return "", err } deadline := time.Now().Add(waitTimeout) for { conn, err := net.DialTimeout("tcp", endpoint, 1*time.Second) if err == nil { _ = conn.Close() break } if ctx.Err() != nil { return endpoint, ctx.Err() } if time.Now().After(deadline) { return endpoint, fmt.Errorf("timeout waiting for K8s backend to become reachable at %s", endpoint) } select { case <-ctx.Done(): return endpoint, ctx.Err() case <-time.After(500 * time.Millisecond): } } return endpoint, nil } } // buildScaleFunction generates a SleeperFunc to scale StatefulSets based on specified criteria and service annotations. // Will return nil if the service should not be auto-scaled due config or annotation. func (w *K8sWatcher) buildScaleFunction(service *core.Service, from int32, to int32) SleeperFunc { // Currently, annotations can only be used to opt-out of auto-scaling. // However, this logic is prepared also for opt-in, as it returns a `SleeperFunc` when flags are false but annotations are set to `enabled`. if from <= to { enabled, exists := service.Annotations[AnnotationAutoScaleUp] if exists { enabledBool, err := strconv.ParseBool(strings.TrimSpace(enabled)) if err != nil { logrus.WithFields(logrus.Fields{"service": service.Name}). WithError(err). Warnf("invalid value for %s annotation - disabling service auto-scale-up", AnnotationAutoScaleUp) return nil } if !enabledBool { return nil } } else { if !w.autoScaleUp { return nil } } } if from >= to { enabled, exists := service.Annotations[AnnotationAutoScaleDown] if exists { enabledBool, err := strconv.ParseBool(strings.TrimSpace(enabled)) if err != nil { logrus.WithFields(logrus.Fields{"service": service.Name}). WithError(err). Warnf("invalid value for %s annotation - disabling service auto-scale-down", AnnotationAutoScaleDown) return nil } if !enabledBool { return nil } } else { if !w.autoScaleDown { return nil } } } return func(ctx context.Context) error { serviceName := service.Name if statefulSetName, exists := w.mappings[serviceName]; exists { // Get current replicas to check if scaling is needed if scale, err := w.clientset.AppsV1().StatefulSets(service.Namespace).GetScale(ctx, statefulSetName, meta.GetOptions{}); err == nil { replicas := scale.Status.Replicas logrus.WithFields(logrus.Fields{ "service": serviceName, "statefulSet": statefulSetName, "replicas": replicas, }).Debug("StatefulSet of Service Replicas") if replicas == from { // Use Patch instead of Update to avoid optimistic concurrency errors // This doesn't require resourceVersion and is atomic patchData := fmt.Sprintf(`{"spec":{"replicas":%d}}`, to) _, err := w.clientset.AppsV1().StatefulSets(service.Namespace).Patch( ctx, statefulSetName, types.StrategicMergePatchType, []byte(patchData), meta.PatchOptions{}, ) if err == nil { logrus.WithFields(logrus.Fields{ "service": serviceName, "statefulSet": statefulSetName, "replicas": replicas, }).Infof("StatefulSet Replicas Autoscaled from %d to %d", from, to) return nil } // Fallback to UpdateScale if Patch fails due to RBAC permissions // This maintains backward compatibility with existing RBAC configurations if strings.Contains(err.Error(), "forbidden") { logrus.WithFields(logrus.Fields{ "service": serviceName, "statefulSet": statefulSetName, }).Warn("Patch operation forbidden - falling back to UpdateScale. Consider updating RBAC to allow 'patch' verb for better concurrency handling") scale.Spec.Replicas = to if _, updateErr := w.clientset.AppsV1().StatefulSets(service.Namespace).UpdateScale( ctx, statefulSetName, scale, meta.UpdateOptions{}, ); updateErr == nil { logrus.WithFields(logrus.Fields{ "service": serviceName, "statefulSet": statefulSetName, "replicas": replicas, }).Infof("StatefulSet Replicas Autoscaled from %d to %d (via UpdateScale fallback)", from, to) return nil } else { return errors.Wrapf(updateErr, "UpdateScale fallback for Replicas=%d failed for StatefulSet: %s", to, statefulSetName) } } return errors.Wrapf(err, "Patch for Replicas=%d failed for StatefulSet: %s", to, statefulSetName) } // Replicas already at desired state return nil } else { return fmt.Errorf("GetScale failed for StatefulSet %s: %w", statefulSetName, err) } } return nil } }