HEX
Server: Apache/2.4.54 (Win64) OpenSSL/1.1.1p PHP/7.4.30
System: Windows NT website-api 10.0 build 20348 (Windows Server 2016) AMD64
User: SYSTEM (0)
PHP: 7.4.30
Disabled: NONE
Upload Files
File: C:/github_repos/casibase/object/cache_manager.go
// Copyright 2025 The Casibase Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package object

import (
	"context"
	"fmt"
	"sync"
	"time"

	appsv1 "k8s.io/api/apps/v1"
	v1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
	"k8s.io/apimachinery/pkg/api/resource"
	"k8s.io/client-go/informers"
	coreinformers "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/tools/cache"
)

type CachedMetrics struct {
	TotalCPU         resource.Quantity `json:"totalCPU"`
	TotalMemory      resource.Quantity `json:"totalMemory"`
	PodCount         int               `json:"podCount"`
	CPUPercentage    float64           `json:"cpuPercentage"`
	MemoryPercentage float64           `json:"memoryPercentage"`
	LastUpdated      time.Time         `json:"lastUpdated"`
}

// EventCache represents namespace-level event cache
type EventCache struct {
	Namespace string               `json:"namespace"` // Namespace name
	Events    map[string]*v1.Event `json:"events"`    // Event name -> Event object
}

type CacheManager struct {
	factory      informers.SharedInformerFactory
	nsInformer   coreinformers.NamespaceInformer
	svcCache     map[string]map[string]*v1.Service           // namespace -> name -> service
	deployCache  map[string]map[string]*appsv1.Deployment    // namespace -> name -> deployment
	ingressCache map[string]map[string]*networkingv1.Ingress // namespace -> name -> ingress
	eventCaches  map[string]*EventCache                      // namespace -> EventCache
	nsCache      map[string]*v1.Namespace                    // name -> namespace
	nodeCache    map[string]*v1.Node                         // name -> node

	metricsCache map[string]*CachedMetrics // namespace -> metrics

	mu      sync.RWMutex
	stopCh  chan struct{}
	started bool

	// debug counters
	svcHits     int
	deployHits  int
	nodeHits    int
	nsHits      int
	ingressHits int
	eventHits   int
	metricsHits int
}

var (
	cacheManager  *CacheManager
	cachedK8sHost string
)

// initCacheManager initializes the cache manager
func initCacheManager() error {
	if cacheManager != nil {
		return nil
	}

	factory := informers.NewSharedInformerFactory(k8sClient.clientSet, 30*time.Second)

	mgr := &CacheManager{
		factory:      factory,
		nsInformer:   factory.Core().V1().Namespaces(),
		svcCache:     make(map[string]map[string]*v1.Service),
		deployCache:  make(map[string]map[string]*appsv1.Deployment),
		ingressCache: make(map[string]map[string]*networkingv1.Ingress),
		eventCaches:  make(map[string]*EventCache),
		nsCache:      make(map[string]*v1.Namespace),
		nodeCache:    make(map[string]*v1.Node),
		metricsCache: make(map[string]*CachedMetrics),
		stopCh:       make(chan struct{}),
	}

	if err := mgr.setupInformers(); err != nil {
		return fmt.Errorf("failed to setup informers: %v", err)
	}

	cacheManager = mgr
	return nil
}

// startCacheManager starts the cache manager if it exists
func startCacheManager(lang string) error {
	if cacheManager == nil {
		return fmt.Errorf("cache manager not initialized")
	}

	if cacheManager.started {
		return nil
	}

	if err := cacheManager.Start(lang); err != nil {
		return fmt.Errorf("failed to start cache manager: %v", err)
	}

	return nil
}

// setupInformers configures all required informers
func (cm *CacheManager) setupInformers() error {
	// Namespace informer
	cm.nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cm.onNamespaceAdd,
		UpdateFunc: cm.onNamespaceUpdate,
		DeleteFunc: cm.onNamespaceDelete,
	})

	// Service informer for all namespaces
	svcInformer := cm.factory.Core().V1().Services()
	svcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cm.onServiceAdd,
		UpdateFunc: cm.onServiceUpdate,
		DeleteFunc: cm.onServiceDelete,
	})

	// Deployment informer for all namespaces
	deployInformer := cm.factory.Apps().V1().Deployments()
	deployInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cm.onDeploymentAdd,
		UpdateFunc: cm.onDeploymentUpdate,
		DeleteFunc: cm.onDeploymentDelete,
	})

	// Node informer for external IP resolution
	nodeInformer := cm.factory.Core().V1().Nodes()
	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cm.onNodeAdd,
		UpdateFunc: cm.onNodeUpdate,
		DeleteFunc: cm.onNodeDelete,
	})

	// Ingress informer for all namespaces
	ingressInformer := cm.factory.Networking().V1().Ingresses()
	ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cm.onIngressAdd,
		UpdateFunc: cm.onIngressUpdate,
		DeleteFunc: cm.onIngressDelete,
	})

	// Event informer
	eventInformer := cm.factory.Core().V1().Events()
	eventInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    cm.onEventAdd,
		UpdateFunc: cm.onEventUpdate,
		DeleteFunc: cm.onEventDelete,
	})

	return nil
}

// Event-related event handlers
func (cm *CacheManager) onEventAdd(obj interface{}) {
	event := obj.(*v1.Event)
	cm.updateEventCache(event)
}

func (cm *CacheManager) onEventUpdate(oldObj, newObj interface{}) {
	event := newObj.(*v1.Event)
	cm.updateEventCache(event)
}

func (cm *CacheManager) onEventDelete(obj interface{}) {
	event := obj.(*v1.Event)
	cm.deleteEventCache(event)
}

// updateEventCache updates the event cache
func (cm *CacheManager) updateEventCache(event *v1.Event) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	ns := event.Namespace

	if cm.eventCaches[ns] == nil {
		cm.eventCaches[ns] = &EventCache{
			Namespace: ns,
			Events:    make(map[string]*v1.Event),
		}
	}
	cm.eventCaches[ns].Events[event.Name] = event
}

// deleteEventCache deletes from event cache
func (cm *CacheManager) deleteEventCache(event *v1.Event) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	if nsEventCache, exists := cm.eventCaches[event.Namespace]; exists {
		delete(nsEventCache.Events, event.Name)
		if len(nsEventCache.Events) == 0 {
			delete(cm.eventCaches, event.Namespace)
		}
	}
}

// getEvents retrieves all events for the specified namespace
func (cm *CacheManager) getEvents(namespace string) []*v1.Event {
	cm.mu.RLock()
	defer cm.mu.RUnlock()

	cm.eventHits++

	var events []*v1.Event
	if nsEventCache, exists := cm.eventCaches[namespace]; exists {
		for _, event := range nsEventCache.Events {
			events = append(events, event)
		}
	}
	return events
}

// Start begins the cache manager
func (cm *CacheManager) Start(lang string) error {
	if cm.started {
		return nil
	}

	cm.factory.Start(cm.stopCh)

	// Wait for initial sync
	synced := cm.factory.WaitForCacheSync(cm.stopCh)
	for informer, synced := range synced {
		if !synced {
			return fmt.Errorf("failed to sync informer: %v", informer)
		}
	}

	if metricsClient != nil {
		go cm.startMetricsPoller(60*time.Second, lang)
	}

	cm.started = true
	return nil
}

// Stop stops the cache manager
func (cm *CacheManager) Stop() {
	if cm.started {
		close(cm.stopCh)
		cm.started = false
	}
}

func (cm *CacheManager) startMetricsPoller(interval time.Duration, lang string) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	cm.updateAllMetrics(lang)

	for {
		select {
		case <-ticker.C:
			cm.updateAllMetrics(lang)
		case <-cm.stopCh:
			return
		}
	}
}

func (cm *CacheManager) updateAllMetrics(lang string) {
	cm.mu.RLock()
	namespaces := make([]string, 0, len(cm.nsCache))
	for name := range cm.nsCache {
		namespaces = append(namespaces, name)
	}
	cm.mu.RUnlock()

	for _, namespace := range namespaces {
		cm.updateNamespaceMetrics(namespace, lang)
		time.Sleep(100 * time.Millisecond)
	}
}

// updateNamespaceMetrics updates cached metrics for a namespace
func (cm *CacheManager) updateNamespaceMetrics(namespace string, lang string) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	metrics, err := calculateNamespaceMetrics(ctx, metricsClient, namespace, cm.deployCache, &cm.mu, lang)
	if err != nil {
		return
	}

	cm.mu.Lock()
	cm.metricsCache[namespace] = metrics
	cm.mu.Unlock()
}

func (cm *CacheManager) getNamespaceMetricsFromCache(namespace string) (*CachedMetrics, bool) {
	cm.mu.RLock()
	defer cm.mu.RUnlock()

	cm.metricsHits++

	metrics, found := cm.metricsCache[namespace]
	return metrics, found
}

// Namespace handlers
func (cm *CacheManager) onNamespaceAdd(obj interface{}) {
	ns := obj.(*v1.Namespace)
	cm.updateNamespaceCache(ns)
}

func (cm *CacheManager) onNamespaceUpdate(oldObj, newObj interface{}) {
	ns := newObj.(*v1.Namespace)
	cm.updateNamespaceCache(ns)
}

func (cm *CacheManager) onNamespaceDelete(obj interface{}) {
	ns := obj.(*v1.Namespace)
	cm.deleteNamespaceCache(ns)
}

func (cm *CacheManager) updateNamespaceCache(ns *v1.Namespace) {
	cm.mu.Lock()
	defer cm.mu.Unlock()
	cm.nsCache[ns.Name] = ns
}

func (cm *CacheManager) deleteNamespaceCache(ns *v1.Namespace) {
	cm.mu.Lock()
	defer cm.mu.Unlock()
	delete(cm.nsCache, ns.Name)
	delete(cm.metricsCache, ns.Name)
	delete(cm.eventCaches, ns.Name)
}

// Service event handlers
func (cm *CacheManager) onServiceAdd(obj interface{}) {
	svc := obj.(*v1.Service)
	cm.updateServiceCache(svc)
}

func (cm *CacheManager) onServiceUpdate(oldObj, newObj interface{}) {
	svc := newObj.(*v1.Service)
	cm.updateServiceCache(svc)
}

func (cm *CacheManager) onServiceDelete(obj interface{}) {
	svc := obj.(*v1.Service)
	cm.deleteServiceCache(svc)
}

func (cm *CacheManager) updateServiceCache(svc *v1.Service) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	ns := svc.Namespace
	if cm.svcCache[ns] == nil {
		cm.svcCache[ns] = make(map[string]*v1.Service)
	}
	cm.svcCache[ns][svc.Name] = svc
}

func (cm *CacheManager) deleteServiceCache(svc *v1.Service) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	if nsCache, exists := cm.svcCache[svc.Namespace]; exists {
		delete(nsCache, svc.Name)
	}
}

// Deployment event handlers
func (cm *CacheManager) onDeploymentAdd(obj interface{}) {
	deploy := obj.(*appsv1.Deployment)
	cm.updateDeploymentCache(deploy)
}

func (cm *CacheManager) onDeploymentUpdate(oldObj, newObj interface{}) {
	deploy := newObj.(*appsv1.Deployment)
	cm.updateDeploymentCache(deploy)
}

func (cm *CacheManager) onDeploymentDelete(obj interface{}) {
	deploy := obj.(*appsv1.Deployment)
	cm.deleteDeploymentCache(deploy)
}

func (cm *CacheManager) updateDeploymentCache(deploy *appsv1.Deployment) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	ns := deploy.Namespace
	if cm.deployCache[ns] == nil {
		cm.deployCache[ns] = make(map[string]*appsv1.Deployment)
	}
	cm.deployCache[ns][deploy.Name] = deploy
}

func (cm *CacheManager) deleteDeploymentCache(deploy *appsv1.Deployment) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	if nsCache, exists := cm.deployCache[deploy.Namespace]; exists {
		delete(nsCache, deploy.Name)
	}
}

// Node event handlers
func (cm *CacheManager) onNodeAdd(obj interface{}) {
	node := obj.(*v1.Node)
	cm.updateNodeCache(node)
}

func (cm *CacheManager) onNodeUpdate(oldObj, newObj interface{}) {
	node := newObj.(*v1.Node)
	cm.updateNodeCache(node)
}

func (cm *CacheManager) onNodeDelete(obj interface{}) {
	node := obj.(*v1.Node)
	cm.deleteNodeCache(node)
}

func (cm *CacheManager) updateNodeCache(node *v1.Node) {
	cm.mu.Lock()
	defer cm.mu.Unlock()
	cm.nodeCache[node.Name] = node
}

func (cm *CacheManager) deleteNodeCache(node *v1.Node) {
	cm.mu.Lock()
	defer cm.mu.Unlock()
	delete(cm.nodeCache, node.Name)
}

func (cm *CacheManager) onIngressAdd(obj interface{}) {
	ingress := obj.(*networkingv1.Ingress)
	cm.updateIngressCache(ingress)
}

func (cm *CacheManager) onIngressUpdate(oldObj, newObj interface{}) {
	ingress := newObj.(*networkingv1.Ingress)
	cm.updateIngressCache(ingress)
}

func (cm *CacheManager) onIngressDelete(obj interface{}) {
	ingress := obj.(*networkingv1.Ingress)
	cm.deleteIngressCache(ingress)
}

func (cm *CacheManager) updateIngressCache(ingress *networkingv1.Ingress) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	ns := ingress.Namespace
	if cm.ingressCache[ns] == nil {
		cm.ingressCache[ns] = make(map[string]*networkingv1.Ingress)
	}
	cm.ingressCache[ns][ingress.Name] = ingress
}

func (cm *CacheManager) deleteIngressCache(ingress *networkingv1.Ingress) {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	if nsCache, exists := cm.ingressCache[ingress.Namespace]; exists {
		delete(nsCache, ingress.Name)
	}
}

func (cm *CacheManager) getNamespace(name string) *v1.Namespace {
	cm.mu.RLock()
	defer cm.mu.RUnlock()

	cm.nsHits++

	if ns, exists := cm.nsCache[name]; exists {
		return ns
	}
	return nil
}

// Cache access methods with hit counting
func (cm *CacheManager) getServices(namespace string) []*v1.Service {
	cm.mu.RLock()
	defer cm.mu.RUnlock()

	cm.svcHits++

	var services []*v1.Service
	if nsCache, exists := cm.svcCache[namespace]; exists {
		for _, svc := range nsCache {
			if svc.Name != "kubernetes" { // Skip default service
				services = append(services, svc)
			}
		}
	}
	return services
}

func (cm *CacheManager) getDeployments(namespace string) []*appsv1.Deployment {
	cm.mu.RLock()
	defer cm.mu.RUnlock()

	cm.deployHits++

	var deployments []*appsv1.Deployment
	if nsCache, exists := cm.deployCache[namespace]; exists {
		for _, deploy := range nsCache {
			deployments = append(deployments, deploy)
		}
	}
	return deployments
}

func (cm *CacheManager) getNodes() []*v1.Node {
	cm.mu.RLock()
	defer cm.mu.RUnlock()

	cm.nodeHits++

	var nodes []*v1.Node
	for _, node := range cm.nodeCache {
		nodes = append(nodes, node)
	}
	return nodes
}

func (cm *CacheManager) getIngresses(namespace string) []*networkingv1.Ingress {
	cm.mu.RLock()
	defer cm.mu.RUnlock()

	cm.ingressHits++

	var ingresses []*networkingv1.Ingress
	if nsCache, exists := cm.ingressCache[namespace]; exists {
		for _, ingress := range nsCache {
			ingresses = append(ingresses, ingress)
		}
	}
	return ingresses
}