Allow mc-router to scale a backend StatefulSet while routing traffic to a proxy via proxyServerName (#512)
This commit is contained in:
+68
-19
@@ -11,10 +11,10 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
apps "k8s.io/api/apps/v1"
|
||||
autoscaling "k8s.io/api/autoscaling/v1"
|
||||
core "k8s.io/api/core/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
@@ -26,6 +26,7 @@ const (
|
||||
AnnotationDefaultServer = "mc-router.itzg.me/defaultServer"
|
||||
AnnotationAutoScaleUp = "mc-router.itzg.me/autoScaleUp"
|
||||
AnnotationAutoScaleDown = "mc-router.itzg.me/autoScaleDown"
|
||||
AnnotationProxyServerName = "mc-router.itzg.me/proxyServerName"
|
||||
)
|
||||
|
||||
// K8sWatcher is a RouteFinder that can find routes from kubernetes services.
|
||||
@@ -184,9 +185,9 @@ func (w *K8sWatcher) handleUpdate(oldObj interface{}, newObj interface{}) {
|
||||
"new": newRoutableService,
|
||||
}).Debug("UPDATE")
|
||||
if newRoutableService.externalServiceName != "" {
|
||||
w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "")
|
||||
w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "")
|
||||
} else {
|
||||
w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "")
|
||||
w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint, newRoutableService.scalingTarget, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown, "")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -201,7 +202,7 @@ func (w *K8sWatcher) handleDelete(obj interface{}) {
|
||||
if routableService.externalServiceName != "" {
|
||||
w.routesHandler.DeleteMapping(routableService.externalServiceName)
|
||||
} else {
|
||||
w.routesHandler.SetDefaultRoute("", nil, nil, "")
|
||||
w.routesHandler.SetDefaultRoute("", "", nil, nil, "")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -215,9 +216,9 @@ func (w *K8sWatcher) handleAdd(obj interface{}) {
|
||||
logrus.WithField("routableService", routableService).Debug("ADD")
|
||||
|
||||
if routableService.externalServiceName != "" {
|
||||
w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.autoScaleUp, routableService.autoScaleDown, "")
|
||||
w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, "")
|
||||
} else {
|
||||
w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.autoScaleUp, routableService.autoScaleDown, "")
|
||||
w.routesHandler.SetDefaultRoute(routableService.containerEndpoint, routableService.scalingTarget, routableService.autoScaleUp, routableService.autoScaleDown, "")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -226,6 +227,7 @@ func (w *K8sWatcher) handleAdd(obj interface{}) {
|
||||
type routableService struct {
|
||||
externalServiceName string
|
||||
containerEndpoint string
|
||||
scalingTarget string
|
||||
autoScaleUp WakerFunc
|
||||
autoScaleDown SleeperFunc
|
||||
}
|
||||
@@ -273,11 +275,25 @@ func (w *K8sWatcher) buildDetails(service *core.Service, externalServiceName str
|
||||
port = mcPort
|
||||
}
|
||||
endpoint := net.JoinHostPort(clusterIp, port)
|
||||
|
||||
routingEndpoint := endpoint
|
||||
scalingTarget := endpoint // Default to service endpoint for scaling
|
||||
|
||||
if proxyServerName, exists := service.Annotations[AnnotationProxyServerName]; exists && proxyServerName != "" {
|
||||
// Ensure the proxy address has a port
|
||||
if _, _, err := net.SplitHostPort(proxyServerName); err != nil {
|
||||
proxyServerName = net.JoinHostPort(proxyServerName, "25565")
|
||||
}
|
||||
routingEndpoint = proxyServerName
|
||||
// scalingTarget remains the service endpoint (already set above)
|
||||
}
|
||||
|
||||
wakerFunc := w.buildScaleFunction(service, 0, 1)
|
||||
rs := &routableService{
|
||||
externalServiceName: externalServiceName,
|
||||
containerEndpoint: endpoint,
|
||||
autoScaleUp: buildWakerFromSleeper(endpoint, wakerFunc),
|
||||
containerEndpoint: routingEndpoint,
|
||||
scalingTarget: scalingTarget,
|
||||
autoScaleUp: buildWakerFromSleeper(routingEndpoint, wakerFunc),
|
||||
autoScaleDown: w.buildScaleFunction(service, 1, 0),
|
||||
}
|
||||
return rs
|
||||
@@ -332,6 +348,7 @@ func (w *K8sWatcher) buildScaleFunction(service *core.Service, from int32, to in
|
||||
return func(ctx context.Context) error {
|
||||
serviceName := service.Name
|
||||
if statefulSetName, exists := w.mappings[serviceName]; exists {
|
||||
// Get current replicas to check if scaling is needed
|
||||
if scale, err := w.clientset.AppsV1().StatefulSets(service.Namespace).GetScale(ctx, statefulSetName, meta.GetOptions{}); err == nil {
|
||||
replicas := scale.Status.Replicas
|
||||
logrus.WithFields(logrus.Fields{
|
||||
@@ -339,25 +356,57 @@ func (w *K8sWatcher) buildScaleFunction(service *core.Service, from int32, to in
|
||||
"statefulSet": statefulSetName,
|
||||
"replicas": replicas,
|
||||
}).Debug("StatefulSet of Service Replicas")
|
||||
|
||||
if replicas == from {
|
||||
if _, err := w.clientset.AppsV1().StatefulSets(service.Namespace).UpdateScale(ctx, statefulSetName, &autoscaling.Scale{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
Name: scale.Name,
|
||||
Namespace: scale.Namespace,
|
||||
UID: scale.UID,
|
||||
ResourceVersion: scale.ResourceVersion,
|
||||
},
|
||||
Spec: autoscaling.ScaleSpec{Replicas: to}}, meta.UpdateOptions{},
|
||||
); err == nil {
|
||||
// Use Patch instead of Update to avoid optimistic concurrency errors
|
||||
// This doesn't require resourceVersion and is atomic
|
||||
patchData := fmt.Sprintf(`{"spec":{"replicas":%d}}`, to)
|
||||
_, err := w.clientset.AppsV1().StatefulSets(service.Namespace).Patch(
|
||||
ctx,
|
||||
statefulSetName,
|
||||
types.StrategicMergePatchType,
|
||||
[]byte(patchData),
|
||||
meta.PatchOptions{},
|
||||
)
|
||||
if err == nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"service": serviceName,
|
||||
"statefulSet": statefulSetName,
|
||||
"replicas": replicas,
|
||||
}).Infof("StatefulSet Replicas Autoscaled from %d to %d", from, to)
|
||||
} else {
|
||||
return errors.Wrapf(err, "UpdateScale for Replicas=%d failed for StatefulSet: %s", to, statefulSetName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fallback to UpdateScale if Patch fails due to RBAC permissions
|
||||
// This maintains backward compatibility with existing RBAC configurations
|
||||
if strings.Contains(err.Error(), "forbidden") {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"service": serviceName,
|
||||
"statefulSet": statefulSetName,
|
||||
}).Warn("Patch operation forbidden - falling back to UpdateScale. Consider updating RBAC to allow 'patch' verb for better concurrency handling")
|
||||
|
||||
scale.Spec.Replicas = to
|
||||
if _, updateErr := w.clientset.AppsV1().StatefulSets(service.Namespace).UpdateScale(
|
||||
ctx,
|
||||
statefulSetName,
|
||||
scale,
|
||||
meta.UpdateOptions{},
|
||||
); updateErr == nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"service": serviceName,
|
||||
"statefulSet": statefulSetName,
|
||||
"replicas": replicas,
|
||||
}).Infof("StatefulSet Replicas Autoscaled from %d to %d (via UpdateScale fallback)", from, to)
|
||||
return nil
|
||||
} else {
|
||||
return errors.Wrapf(updateErr, "UpdateScale fallback for Replicas=%d failed for StatefulSet: %s", to, statefulSetName)
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Wrapf(err, "Patch for Replicas=%d failed for StatefulSet: %s", to, statefulSetName)
|
||||
}
|
||||
// Replicas already at desired state
|
||||
return nil
|
||||
} else {
|
||||
return fmt.Errorf("GetScale failed for StatefulSet %s: %w", statefulSetName, err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user