1 package resourcebundlestate
7 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
9 corev1 "k8s.io/api/core/v1"
10 k8serrors "k8s.io/apimachinery/pkg/api/errors"
11 "k8s.io/apimachinery/pkg/types"
12 "sigs.k8s.io/controller-runtime/pkg/client"
13 "sigs.k8s.io/controller-runtime/pkg/controller"
14 "sigs.k8s.io/controller-runtime/pkg/handler"
15 "sigs.k8s.io/controller-runtime/pkg/manager"
16 "sigs.k8s.io/controller-runtime/pkg/reconcile"
17 "sigs.k8s.io/controller-runtime/pkg/source"
20 // AddPodController the new controller to the controller manager
21 func AddPodController(mgr manager.Manager) error {
22 return addPodController(mgr, newPodReconciler(mgr))
25 func addPodController(mgr manager.Manager, r *podReconciler) error {
26 // Create a new controller
27 c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
32 // Watch for changes to secondar resource Pods
33 // Predicate filters pods which don't have the k8splugin label
34 err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, &podPredicate{})
42 func newPodReconciler(m manager.Manager) *podReconciler {
43 return &podReconciler{client: m.GetClient()}
46 type podReconciler struct {
50 // Reconcile implements the loop that will update the ResourceBundleState CR
51 // whenever we get any updates from all the pods we watch.
52 func (r *podReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
53 log.Printf("Updating ResourceBundleState for Pod: %+v\n", req)
56 err := r.client.Get(context.TODO(), req.NamespacedName, pod)
58 if k8serrors.IsNotFound(err) {
59 log.Printf("Pod not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
60 // Remove the Pod's status from StatusList
61 // This can happen if we get the DeletionTimeStamp event
62 // after the POD has been deleted.
63 r.deletePodFromAllCRs(req.NamespacedName)
64 return reconcile.Result{}, nil
66 log.Printf("Failed to get pod: %+v\n", req.NamespacedName)
67 return reconcile.Result{}, err
70 // Find the CRs which track this pod via the labelselector
71 crSelector := returnLabel(pod.GetLabels())
72 if crSelector == nil {
73 log.Println("We should not be here. The predicate should have filtered this Pod")
76 // Get the CRs which have this label and update them all
77 // Ideally, we will have only one CR, but there is nothing
78 // preventing the creation of multiple.
79 // TODO: Consider using an admission validating webook to prevent multiple
80 rbStatusList := &v1alpha1.ResourceBundleStateList{}
81 err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
82 if err != nil || len(rbStatusList.Items) == 0 {
83 log.Printf("Did not find any CRs tracking this resource\n")
84 return reconcile.Result{}, nil
87 err = r.updateCRs(rbStatusList, pod)
90 return reconcile.Result{}, err
93 return reconcile.Result{}, nil
96 // deletePodFromAllCRs deletes pod status from all the CRs when the POD itself has been deleted
97 // and we have not handled the updateCRs yet.
98 // Since, we don't have the pod's labels, we need to look at all the CRs in this namespace
99 func (r *podReconciler) deletePodFromAllCRs(namespacedName types.NamespacedName) error {
101 rbStatusList := &v1alpha1.ResourceBundleStateList{}
102 err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
103 if err != nil || len(rbStatusList.Items) == 0 {
104 log.Printf("Did not find any CRs tracking this resource\n")
107 for _, cr := range rbStatusList.Items {
108 r.deleteFromSingleCR(&cr, namespacedName.Name)
114 func (r *podReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, pod *corev1.Pod) error {
116 for _, cr := range crl.Items {
117 // Pod is not scheduled for deletion
118 if pod.DeletionTimestamp == nil {
119 err := r.updateSingleCR(&cr, pod)
124 // Pod is scheduled for deletion
125 r.deleteFromSingleCR(&cr, pod.Name)
132 func (r *podReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
133 cr.Status.ResourceCount--
134 length := len(cr.Status.PodStatuses)
135 for i, rstatus := range cr.Status.PodStatuses {
136 if rstatus.Name == name {
137 //Delete that status from the array
138 cr.Status.PodStatuses[i] = cr.Status.PodStatuses[length-1]
139 cr.Status.PodStatuses[length-1] = v1alpha1.PodStatus{}
140 cr.Status.PodStatuses = cr.Status.PodStatuses[:length-1]
145 log.Println("Did not find a status for POD in CR")
149 func (r *podReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, pod *corev1.Pod) error {
151 // Update status after searching for it in the list of resourceStatuses
152 for i, rstatus := range cr.Status.PodStatuses {
153 // Look for the status if we already have it in the CR
154 if rstatus.Name == pod.Name {
155 pod.Status.DeepCopyInto(&cr.Status.PodStatuses[i].Status)
156 err := r.client.Status().Update(context.TODO(), cr)
158 log.Printf("failed to update rbstate: %v\n", err)
165 // Exited for loop with no status found
166 // Increment the number of tracked resources
167 cr.Status.ResourceCount++
170 cr.Status.PodStatuses = append(cr.Status.PodStatuses, v1alpha1.PodStatus{
171 ObjectMeta: pod.ObjectMeta,
176 err := r.client.Status().Update(context.TODO(), cr)
178 log.Printf("failed to update rbstate: %v\n", err)