Update package path to use github
[multicloud/k8s.git] / src / monitor / pkg / controller / resourcebundlestate / pod_controller.go
1 package resourcebundlestate
2
3 import (
4         "context"
5         "log"
6
7         "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
8
9         corev1 "k8s.io/api/core/v1"
10         k8serrors "k8s.io/apimachinery/pkg/api/errors"
11         "k8s.io/apimachinery/pkg/types"
12         "sigs.k8s.io/controller-runtime/pkg/client"
13         "sigs.k8s.io/controller-runtime/pkg/controller"
14         "sigs.k8s.io/controller-runtime/pkg/handler"
15         "sigs.k8s.io/controller-runtime/pkg/manager"
16         "sigs.k8s.io/controller-runtime/pkg/reconcile"
17         "sigs.k8s.io/controller-runtime/pkg/source"
18 )
19
20 // AddPodController the new controller to the controller manager
21 func AddPodController(mgr manager.Manager) error {
22         return addPodController(mgr, newPodReconciler(mgr))
23 }
24
25 func addPodController(mgr manager.Manager, r *podReconciler) error {
26         // Create a new controller
27         c, err := controller.New("ResourceBundleState-controller", mgr, controller.Options{Reconciler: r})
28         if err != nil {
29                 return err
30         }
31
32         // Watch for changes to secondar resource Pods
33         // Predicate filters pods which don't have the k8splugin label
34         err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, &podPredicate{})
35         if err != nil {
36                 return err
37         }
38
39         return nil
40 }
41
42 func newPodReconciler(m manager.Manager) *podReconciler {
43         return &podReconciler{client: m.GetClient()}
44 }
45
46 type podReconciler struct {
47         client client.Client
48 }
49
50 // Reconcile implements the loop that will update the ResourceBundleState CR
51 // whenever we get any updates from all the pods we watch.
52 func (r *podReconciler) Reconcile(req reconcile.Request) (reconcile.Result, error) {
53         log.Printf("Updating ResourceBundleState for Pod: %+v\n", req)
54
55         pod := &corev1.Pod{}
56         err := r.client.Get(context.TODO(), req.NamespacedName, pod)
57         if err != nil {
58                 if k8serrors.IsNotFound(err) {
59                         log.Printf("Pod not found: %+v. Remove from CR if it is stored there.\n", req.NamespacedName)
60                         // Remove the Pod's status from StatusList
61                         // This can happen if we get the DeletionTimeStamp event
62                         // after the POD has been deleted.
63                         r.deletePodFromAllCRs(req.NamespacedName)
64                         return reconcile.Result{}, nil
65                 }
66                 log.Printf("Failed to get pod: %+v\n", req.NamespacedName)
67                 return reconcile.Result{}, err
68         }
69
70         // Find the CRs which track this pod via the labelselector
71         crSelector := returnLabel(pod.GetLabels())
72         if crSelector == nil {
73                 log.Println("We should not be here. The predicate should have filtered this Pod")
74         }
75
76         // Get the CRs which have this label and update them all
77         // Ideally, we will have only one CR, but there is nothing
78         // preventing the creation of multiple.
79         // TODO: Consider using an admission validating webook to prevent multiple
80         rbStatusList := &v1alpha1.ResourceBundleStateList{}
81         err = listResources(r.client, req.Namespace, crSelector, rbStatusList)
82         if err != nil || len(rbStatusList.Items) == 0 {
83                 log.Printf("Did not find any CRs tracking this resource\n")
84                 return reconcile.Result{}, nil
85         }
86
87         err = r.updateCRs(rbStatusList, pod)
88         if err != nil {
89                 // Requeue the update
90                 return reconcile.Result{}, err
91         }
92
93         return reconcile.Result{}, nil
94 }
95
96 // deletePodFromAllCRs deletes pod status from all the CRs when the POD itself has been deleted
97 // and we have not handled the updateCRs yet.
98 // Since, we don't have the pod's labels, we need to look at all the CRs in this namespace
99 func (r *podReconciler) deletePodFromAllCRs(namespacedName types.NamespacedName) error {
100
101         rbStatusList := &v1alpha1.ResourceBundleStateList{}
102         err := listResources(r.client, namespacedName.Namespace, nil, rbStatusList)
103         if err != nil || len(rbStatusList.Items) == 0 {
104                 log.Printf("Did not find any CRs tracking this resource\n")
105                 return nil
106         }
107         for _, cr := range rbStatusList.Items {
108                 r.deleteFromSingleCR(&cr, namespacedName.Name)
109         }
110
111         return nil
112 }
113
114 func (r *podReconciler) updateCRs(crl *v1alpha1.ResourceBundleStateList, pod *corev1.Pod) error {
115
116         for _, cr := range crl.Items {
117                 // Pod is not scheduled for deletion
118                 if pod.DeletionTimestamp == nil {
119                         err := r.updateSingleCR(&cr, pod)
120                         if err != nil {
121                                 return err
122                         }
123                 } else {
124                         // Pod is scheduled for deletion
125                         r.deleteFromSingleCR(&cr, pod.Name)
126                 }
127         }
128
129         return nil
130 }
131
132 func (r *podReconciler) deleteFromSingleCR(cr *v1alpha1.ResourceBundleState, name string) error {
133         cr.Status.ResourceCount--
134         length := len(cr.Status.PodStatuses)
135         for i, rstatus := range cr.Status.PodStatuses {
136                 if rstatus.Name == name {
137                         //Delete that status from the array
138                         cr.Status.PodStatuses[i] = cr.Status.PodStatuses[length-1]
139                         cr.Status.PodStatuses[length-1] = v1alpha1.PodStatus{}
140                         cr.Status.PodStatuses = cr.Status.PodStatuses[:length-1]
141                         return nil
142                 }
143         }
144
145         log.Println("Did not find a status for POD in CR")
146         return nil
147 }
148
149 func (r *podReconciler) updateSingleCR(cr *v1alpha1.ResourceBundleState, pod *corev1.Pod) error {
150
151         // Update status after searching for it in the list of resourceStatuses
152         for i, rstatus := range cr.Status.PodStatuses {
153                 // Look for the status if we already have it in the CR
154                 if rstatus.Name == pod.Name {
155                         pod.Status.DeepCopyInto(&cr.Status.PodStatuses[i].Status)
156                         err := r.client.Status().Update(context.TODO(), cr)
157                         if err != nil {
158                                 log.Printf("failed to update rbstate: %v\n", err)
159                                 return err
160                         }
161                         return nil
162                 }
163         }
164
165         // Exited for loop with no status found
166         // Increment the number of tracked resources
167         cr.Status.ResourceCount++
168
169         // Add it to CR
170         cr.Status.PodStatuses = append(cr.Status.PodStatuses, v1alpha1.PodStatus{
171                 ObjectMeta: pod.ObjectMeta,
172                 Ready:      false,
173                 Status:     pod.Status,
174         })
175
176         err := r.client.Status().Update(context.TODO(), cr)
177         if err != nil {
178                 log.Printf("failed to update rbstate: %v\n", err)
179                 return err
180         }
181
182         return nil
183 }