Provide option for kubernetes to watch only a specific namespace (#433)

This commit is contained in:
Geoff Bourne
2025-07-20 12:59:14 -05:00
committed by GitHub
parent 9a457138ab
commit 7a4f83a30f
19 changed files with 663 additions and 142 deletions
+99 -73
View File
@@ -25,46 +25,60 @@ const (
AnnotationDefaultServer = "mc-router.itzg.me/defaultServer"
)
type IK8sWatcher interface {
StartWithConfig(ctx context.Context, kubeConfigFile string, autoScaleUp bool, autoScaleDown bool) error
StartInCluster(ctx context.Context, autoScaleUp bool, autoScaleDown bool) error
}
var K8sWatcher IK8sWatcher = &k8sWatcherImpl{}
type k8sWatcherImpl struct {
// K8sWatcher is a RouteFinder that can find routes from kubernetes services.
// It also watches for stateful sets to auto scale up/down, if enabled.
type K8sWatcher struct {
sync.RWMutex
config *rest.Config
autoScaleUp bool
autoScaleDown bool
namespace string
// The key in mappings is a Service, and the value the StatefulSet name
mappings map[string]string
clientset *kubernetes.Clientset
mappings map[string]string
routesHandler RoutesHandler
clientset *kubernetes.Clientset
}
func (w *k8sWatcherImpl) StartInCluster(ctx context.Context, autoScaleUp bool, autoScaleDown bool) error {
func NewK8sWatcherInCluster() (*K8sWatcher, error) {
config, err := rest.InClusterConfig()
if err != nil {
return errors.Wrap(err, "Unable to load in-cluster config")
return nil, errors.Wrap(err, "Unable to load in-cluster config")
}
return w.startWithLoadedConfig(ctx, config, autoScaleUp, autoScaleDown)
return &K8sWatcher{
config: config,
}, nil
}
func (w *k8sWatcherImpl) StartWithConfig(ctx context.Context, kubeConfigFile string, autoScaleUp bool, autoScaleDown bool) error {
func NewK8sWatcherWithConfig(kubeConfigFile string) (*K8sWatcher, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile)
if err != nil {
return errors.Wrap(err, "Could not load kube config file")
return nil, errors.Wrap(err, "Could not load kube config file")
}
return w.startWithLoadedConfig(ctx, config, autoScaleUp, autoScaleDown)
return &K8sWatcher{
config: config,
}, nil
}
func (w *k8sWatcherImpl) startWithLoadedConfig(ctx context.Context, config *rest.Config, autoScaleUp bool, autoScaleDown bool) error {
func (w *K8sWatcher) WithAutoScale(autoScaleUp bool, autoScaleDown bool) *K8sWatcher {
w.autoScaleUp = autoScaleUp
w.autoScaleDown = autoScaleDown
return w
}
clientset, err := kubernetes.NewForConfig(config)
func (w *K8sWatcher) WithNamespace(namespace string) *K8sWatcher {
w.namespace = namespace
return w
}
func (w *K8sWatcher) String() string {
return "k8s"
}
func (w *K8sWatcher) Start(ctx context.Context, handler RoutesHandler) error {
w.routesHandler = handler
clientset, err := kubernetes.NewForConfig(w.config)
if err != nil {
return errors.Wrap(err, "Could not create kube clientset")
}
@@ -74,7 +88,7 @@ func (w *k8sWatcherImpl) startWithLoadedConfig(ctx context.Context, config *rest
ListerWatcher: cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
string(core.ResourceServices),
core.NamespaceAll,
w.namespace,
fields.Everything(),
),
ObjectType: &core.Service{},
@@ -87,51 +101,22 @@ func (w *k8sWatcherImpl) startWithLoadedConfig(ctx context.Context, config *rest
go serviceController.RunWithContext(ctx)
w.mappings = make(map[string]string)
if autoScaleUp || autoScaleDown {
_, statefulSetController := cache.NewInformer(
cache.NewListWatchFromClient(
if w.autoScaleUp || w.autoScaleDown {
_, statefulSetController := cache.NewInformerWithOptions(cache.InformerOptions{
ListerWatcher: cache.NewListWatchFromClient(
clientset.AppsV1().RESTClient(),
"statefulSets",
core.NamespaceAll,
w.namespace,
fields.Everything(),
),
&apps.StatefulSet{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
w.mappings[statefulSet.Spec.ServiceName] = statefulSet.Name
},
DeleteFunc: func(obj interface{}) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
delete(w.mappings, statefulSet.Spec.ServiceName)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldStatefulSet, ok := oldObj.(*apps.StatefulSet)
if !ok {
return
}
newStatefulSet, ok := newObj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
delete(w.mappings, oldStatefulSet.Spec.ServiceName)
w.mappings[newStatefulSet.Spec.ServiceName] = newStatefulSet.Name
},
ObjectType: &apps.StatefulSet{},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: w.handleAddStatefulSet(),
DeleteFunc: w.handleDeleteStatefulSet(),
UpdateFunc: w.handleUpdateStatefulSet(),
},
)
})
go statefulSetController.RunWithContext(ctx)
}
@@ -139,14 +124,55 @@ func (w *k8sWatcherImpl) startWithLoadedConfig(ctx context.Context, config *rest
return nil
}
func (w *K8sWatcher) handleAddStatefulSet() func(obj interface{}) {
return func(obj interface{}) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
w.mappings[statefulSet.Spec.ServiceName] = statefulSet.Name
}
}
func (w *K8sWatcher) handleUpdateStatefulSet() func(oldObj interface{}, newObj interface{}) {
return func(oldObj, newObj interface{}) {
oldStatefulSet, ok := oldObj.(*apps.StatefulSet)
if !ok {
return
}
newStatefulSet, ok := newObj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
delete(w.mappings, oldStatefulSet.Spec.ServiceName)
w.mappings[newStatefulSet.Spec.ServiceName] = newStatefulSet.Name
}
}
func (w *K8sWatcher) handleDeleteStatefulSet() func(obj interface{}) {
return func(obj interface{}) {
statefulSet, ok := obj.(*apps.StatefulSet)
if !ok {
return
}
w.RLock()
defer w.RUnlock()
delete(w.mappings, statefulSet.Spec.ServiceName)
}
}
// oldObj and newObj are expected to be *v1.Service
func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) {
func (w *K8sWatcher) handleUpdate(oldObj interface{}, newObj interface{}) {
for _, oldRoutableService := range w.extractRoutableServices(oldObj) {
logrus.WithFields(logrus.Fields{
"old": oldRoutableService,
}).Debug("UPDATE")
if oldRoutableService.externalServiceName != "" {
Routes.DeleteMapping(oldRoutableService.externalServiceName)
w.routesHandler.DeleteMapping(oldRoutableService.externalServiceName)
}
}
@@ -155,40 +181,40 @@ func (w *k8sWatcherImpl) handleUpdate(oldObj interface{}, newObj interface{}) {
"new": newRoutableService,
}).Debug("UPDATE")
if newRoutableService.externalServiceName != "" {
Routes.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown)
w.routesHandler.CreateMapping(newRoutableService.externalServiceName, newRoutableService.containerEndpoint, newRoutableService.autoScaleUp, newRoutableService.autoScaleDown)
} else {
Routes.SetDefaultRoute(newRoutableService.containerEndpoint)
w.routesHandler.SetDefaultRoute(newRoutableService.containerEndpoint)
}
}
}
// obj is expected to be a *v1.Service
func (w *k8sWatcherImpl) handleDelete(obj interface{}) {
func (w *K8sWatcher) handleDelete(obj interface{}) {
routableServices := w.extractRoutableServices(obj)
for _, routableService := range routableServices {
if routableService != nil {
logrus.WithField("routableService", routableService).Debug("DELETE")
if routableService.externalServiceName != "" {
Routes.DeleteMapping(routableService.externalServiceName)
w.routesHandler.DeleteMapping(routableService.externalServiceName)
} else {
Routes.SetDefaultRoute("")
w.routesHandler.SetDefaultRoute("")
}
}
}
}
// obj is expected to be a *v1.Service
func (w *k8sWatcherImpl) handleAdd(obj interface{}) {
func (w *K8sWatcher) handleAdd(obj interface{}) {
routableServices := w.extractRoutableServices(obj)
for _, routableService := range routableServices {
if routableService != nil {
logrus.WithField("routableService", routableService).Debug("ADD")
if routableService.externalServiceName != "" {
Routes.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.autoScaleUp, routableService.autoScaleDown)
w.routesHandler.CreateMapping(routableService.externalServiceName, routableService.containerEndpoint, routableService.autoScaleUp, routableService.autoScaleDown)
} else {
Routes.SetDefaultRoute(routableService.containerEndpoint)
w.routesHandler.SetDefaultRoute(routableService.containerEndpoint)
}
}
}
@@ -202,7 +228,7 @@ type routableService struct {
}
// obj is expected to be a *v1.Service
func (w *k8sWatcherImpl) extractRoutableServices(obj interface{}) []*routableService {
func (w *K8sWatcher) extractRoutableServices(obj interface{}) []*routableService {
service, ok := obj.(*core.Service)
if !ok {
return nil
@@ -222,7 +248,7 @@ func (w *k8sWatcherImpl) extractRoutableServices(obj interface{}) []*routableSer
return nil
}
func (w *k8sWatcherImpl) buildDetails(service *core.Service, externalServiceName string) *routableService {
func (w *K8sWatcher) buildDetails(service *core.Service, externalServiceName string) *routableService {
clusterIp := service.Spec.ClusterIP
if service.Spec.Type == core.ServiceTypeExternalName {
clusterIp = service.Spec.ExternalName
@@ -252,7 +278,7 @@ func (w *k8sWatcherImpl) buildDetails(service *core.Service, externalServiceName
return rs
}
func (w *k8sWatcherImpl) buildScaleFunction(service *core.Service, from int32, to int32) ScalerFunc {
func (w *K8sWatcher) buildScaleFunction(service *core.Service, from int32, to int32) ScalerFunc {
if from <= to && !w.autoScaleUp {
return nil
}