Fixed Label selector for Collectd Operator
[demo.git] / vnfs / DAaaS / microservices / collectd-operator / pkg / controller / collectdplugin / collectdplugin_controller.go
1 package collectdplugin
2
3 import (
4         "context"
5         "crypto/sha256"
6         "fmt"
7         "github.com/go-logr/logr"
8         "os"
9
10         onapv1alpha1 "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1"
11
12         corev1 "k8s.io/api/core/v1"
13         extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
14         "k8s.io/apimachinery/pkg/api/errors"
15         "k8s.io/apimachinery/pkg/runtime"
16         "sigs.k8s.io/controller-runtime/pkg/client"
17         "sigs.k8s.io/controller-runtime/pkg/controller"
18         "sigs.k8s.io/controller-runtime/pkg/handler"
19         "sigs.k8s.io/controller-runtime/pkg/manager"
20         "sigs.k8s.io/controller-runtime/pkg/reconcile"
21         logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
22         "sigs.k8s.io/controller-runtime/pkg/source"
23 )
24
25 var log = logf.Log.WithName("controller_collectdplugin")
26
27 // ResourceMap to hold objects to update/reload
28 type ResourceMap struct {
29         configMap       *corev1.ConfigMap
30         daemonSet       *extensionsv1beta1.DaemonSet
31         collectdPlugins *[]onapv1alpha1.CollectdPlugin
32 }
33
34 /**
35 * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
36 * business logic.  Delete these comments after modifying this file.*
37  */
38
39 // Add creates a new CollectdPlugin Controller and adds it to the Manager. The Manager will set fields on the Controller
40 // and Start it when the Manager is Started.
41 func Add(mgr manager.Manager) error {
42         return add(mgr, newReconciler(mgr))
43 }
44
45 // newReconciler returns a new reconcile.Reconciler
46 func newReconciler(mgr manager.Manager) reconcile.Reconciler {
47         return &ReconcileCollectdPlugin{client: mgr.GetClient(), scheme: mgr.GetScheme()}
48 }
49
50 // add adds a new Controller to mgr with r as the reconcile.Reconciler
51 func add(mgr manager.Manager, r reconcile.Reconciler) error {
52         // Create a new controller
53         log.V(1).Info("Creating a new controller for CollectdPlugin")
54         c, err := controller.New("collectdplugin-controller", mgr, controller.Options{Reconciler: r})
55         if err != nil {
56                 return err
57         }
58
59         // Watch for changes to primary resource CollectdPlugin
60         log.V(1).Info("Add watcher for primary resource CollectdPlugin")
61         err = c.Watch(&source.Kind{Type: &onapv1alpha1.CollectdPlugin{}}, &handler.EnqueueRequestForObject{})
62         if err != nil {
63                 return err
64         }
65
66         return nil
67 }
68
69 // blank assignment to verify that ReconcileCollectdPlugin implements reconcile.Reconciler
70 var _ reconcile.Reconciler = &ReconcileCollectdPlugin{}
71
72 // ReconcileCollectdPlugin reconciles a CollectdPlugin object
73 type ReconcileCollectdPlugin struct {
74         // This client, initialized using mgr.Client() above, is a split client
75         // that reads objects from the cache and writes to the apiserver
76         client client.Client
77         scheme *runtime.Scheme
78 }
79
80 // Define the collectdPlugin finalizer for handling deletion
81 const (
82         defaultWatchLabel       = "app=collectd"
83         collectdPluginFinalizer = "finalizer.collectdplugin.onap.org"
84
85         // WatchLabelsEnvVar is the constant for env variable WATCH_LABELS
86         // which is the labels where the watch activity happens.
87         // this value is empty if the operator is running with clusterScope.
88         WatchLabelsEnvVar = "WATCH_LABELS"
89 )
90
91 // Reconcile reads that state of the cluster for a CollectdPlugin object and makes changes based on the state read
92 // and what is in the CollectdPlugin.Spec
93 // TODO(user): Modify this Reconcile function to implement your Controller logic.  This example creates
94 // a Pod as an example
95 // Note:
96 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
97 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
98 func (r *ReconcileCollectdPlugin) Reconcile(request reconcile.Request) (reconcile.Result, error) {
99         reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
100         reqLogger.Info("Reconciling CollectdPlugin")
101
102         // Fetch the CollectdPlugin instance
103         instance := &onapv1alpha1.CollectdPlugin{}
104         err := r.client.Get(context.TODO(), request.NamespacedName, instance)
105         if err != nil {
106                 if errors.IsNotFound(err) {
107                         // Request object not found, could have been deleted after reconcile request.
108                         // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
109                         // Return and don't requeue
110                         reqLogger.V(1).Info("CollectdPlugin object Not found")
111                         return reconcile.Result{}, nil
112                 }
113                 // Error reading the object - requeue the request.
114                 reqLogger.V(1).Info("Error reading the CollectdPlugin object, Requeuing")
115                 return reconcile.Result{}, err
116         }
117
118         // Handle Delete CR for additional cleanup
119         isDelete, err := r.handleDelete(reqLogger, instance)
120         if isDelete {
121                 return reconcile.Result{}, err
122         }
123
124         // Add finalizer for this CR
125         if !contains(instance.GetFinalizers(), collectdPluginFinalizer) {
126                 if err := r.addFinalizer(reqLogger, instance); err != nil {
127                         return reconcile.Result{}, err
128                 }
129                 return reconcile.Result{}, nil
130         }
131         err = r.handleCollectdPlugin(reqLogger, instance, false)
132         return reconcile.Result{}, err
133 }
134
135 // handleCollectdPlugin regenerates the collectd conf on CR Create, Update, Delete events
136 func (r *ReconcileCollectdPlugin) handleCollectdPlugin(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin, isDelete bool) error {
137
138         rmap, err := r.findResourceMapForCR(reqLogger, cr)
139         if err != nil {
140                 reqLogger.Error(err, "Skip reconcile: Resources not found")
141                 return err
142         }
143
144         cm := rmap.configMap
145         ds := rmap.daemonSet
146         collectPlugins := rmap.collectdPlugins
147         reqLogger.V(1).Info("Found ResourceMap")
148         reqLogger.V(1).Info(":::: ConfigMap Info ::::", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
149         reqLogger.V(1).Info(":::: DaemonSet Info ::::", "DaemonSet.Namespace", ds.Namespace, "DaemonSet.Name", ds.Name)
150
151         collectdConf, err := rebuildCollectdConf(cr, collectPlugins, isDelete)
152
153         //Restart Collectd Pods
154         //Restart only if hash of configmap has changed.
155         ds.Spec.Template.SetAnnotations(map[string]string{
156                 "daaas-random": ComputeSHA256([]byte(collectdConf)),
157         })
158         cm.SetAnnotations(map[string]string{
159                 "daaas-random": ComputeSHA256([]byte(collectdConf)),
160         })
161
162         cm.Data["node-collectd.conf"] = collectdConf
163
164         // Update the ConfigMap with new Spec and reload DaemonSets
165         reqLogger.Info("Updating the ConfigMap", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
166         log.V(1).Info("ConfigMap Data", "Map: ", cm.Data)
167         err = r.client.Update(context.TODO(), cm)
168         if err != nil {
169                 reqLogger.Error(err, "Update the ConfigMap failed", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
170                 return err
171         }
172
173         reqLogger.Info("Reloading the Daemonset", "DaemonSet.Namespace", ds.Namespace, "DaemonSet.Name", ds.Name)
174         err = r.client.Update(context.TODO(), ds)
175         if err != nil {
176                 reqLogger.Error(err, "Update the DaemonSet failed", "DaemonSet.Namespace", ds.Namespace, "DaemonSet.Name", ds.Name)
177                 return err
178         }
179         r.updateStatus(cr)
180         // Reconcile success
181         reqLogger.Info("Reconcile success!!")
182         return nil
183 }
184
185 // ComputeSHA256  returns hash of data as string
186 func ComputeSHA256(data []byte) string {
187         hash := sha256.Sum256(data)
188         return fmt.Sprintf("%x", hash)
189 }
190
191 // findResourceMapForCR returns the configMap, collectd Daemonset and list of Collectd Plugins
192 func (r *ReconcileCollectdPlugin) findResourceMapForCR(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) (ResourceMap, error) {
193         cmList := &corev1.ConfigMapList{}
194         opts := &client.ListOptions{}
195         rmap := ResourceMap{}
196
197         // Select ConfigMaps with label
198         labelSelector, err := getWatchLabels()
199         if err != nil {
200                 reqLogger.Error(err, "Failed to get watch labels, continuing with default label")
201         }
202         opts.SetLabelSelector(labelSelector)
203         opts.InNamespace(cr.Namespace)
204
205         err = r.client.List(context.TODO(), opts, cmList)
206         if err != nil {
207                 return rmap, err
208         }
209
210         if cmList.Items == nil || len(cmList.Items) == 0 {
211                 return rmap, errors.NewNotFound(corev1.Resource("configmap"), "ConfigMap")
212         }
213
214         // Select DaemonSets with label
215         dsList := &extensionsv1beta1.DaemonSetList{}
216         err = r.client.List(context.TODO(), opts, dsList)
217         if err != nil {
218                 return rmap, err
219         }
220
221         if dsList.Items == nil || len(dsList.Items) == 0 {
222                 return rmap, errors.NewNotFound(corev1.Resource("daemonset"), "DaemonSet")
223         }
224
225         // Get all collectd plugins in the current namespace to rebuild conf.
226         collectdPlugins := &onapv1alpha1.CollectdPluginList{}
227         cpOpts := &client.ListOptions{}
228         cpOpts.InNamespace(cr.Namespace)
229         err = r.client.List(context.TODO(), cpOpts, collectdPlugins)
230         if err != nil {
231                 return rmap, err
232         }
233
234         rmap.configMap = &cmList.Items[0]
235         rmap.daemonSet = &dsList.Items[0]
236         rmap.collectdPlugins = &collectdPlugins.Items //will be nil if no plugins exist
237         return rmap, err
238 }
239
240 // Get all collectd plugins and reconstruct, compute Hash and check for changes
241 func rebuildCollectdConf(cr *onapv1alpha1.CollectdPlugin, cpList *[]onapv1alpha1.CollectdPlugin, isDelete bool) (string, error) {
242         var collectdConf string
243         if *cpList == nil || len(*cpList) == 0 {
244                 return "", errors.NewNotFound(corev1.Resource("collectdplugin"), "CollectdPlugin")
245         }
246         loadPlugin := make(map[string]string)
247         for _, cp := range *cpList {
248                 if cp.Spec.PluginName == "global" {
249                         collectdConf += cp.Spec.PluginConf + "\n"
250                 } else {
251                         loadPlugin[cp.Spec.PluginName] = cp.Spec.PluginConf
252                 }
253         }
254
255         if isDelete {
256                 delete(loadPlugin, cr.Spec.PluginName)
257         }
258
259         log.V(1).Info("::::::: Plugins Map ::::::: ", "PluginMap ", loadPlugin)
260
261         for cpName, cpConf := range loadPlugin {
262                 collectdConf += "LoadPlugin" + " " + cpName + "\n"
263                 collectdConf += cpConf + "\n"
264         }
265
266         collectdConf += "#Last line (collectd requires ā€˜\\nā€™ at the last line)\n"
267
268         return collectdConf, nil
269 }
270
271 // Handle Delete CR event for additional cleanup
272 func (r *ReconcileCollectdPlugin) handleDelete(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) (bool, error) {
273         // Check if the CollectdPlugin instance is marked to be deleted, which is
274         // indicated by the deletion timestamp being set.
275         isMarkedToBeDeleted := cr.GetDeletionTimestamp() != nil
276         if isMarkedToBeDeleted {
277                 if contains(cr.GetFinalizers(), collectdPluginFinalizer) {
278                         // Run finalization logic for collectdPluginFinalizer. If the
279                         // finalization logic fails, don't remove the finalizer so
280                         // that we can retry during the next reconciliation.
281                         if err := r.finalizeCollectdPlugin(reqLogger, cr); err != nil {
282                                 return isMarkedToBeDeleted, err
283                         }
284
285                         // Remove collectdPluginFinalizer. Once all finalizers have been
286                         // removed, the object will be deleted.
287                         cr.SetFinalizers(remove(cr.GetFinalizers(), collectdPluginFinalizer))
288                         err := r.client.Update(context.TODO(), cr)
289                         if err != nil {
290                                 return isMarkedToBeDeleted, err
291                         }
292                 }
293         }
294         return isMarkedToBeDeleted, nil
295 }
296
297 func (r *ReconcileCollectdPlugin) updateStatus(cr *onapv1alpha1.CollectdPlugin) error {
298         podList := &corev1.PodList{}
299         opts := &client.ListOptions{}
300         // Select ConfigMaps with label
301         labelSelector, _ := getWatchLabels()
302         opts.SetLabelSelector(labelSelector)
303         var pods []string
304         opts.InNamespace(cr.Namespace)
305         err := r.client.List(context.TODO(), opts, podList)
306         if err != nil {
307                 return err
308         }
309
310         if podList.Items == nil || len(podList.Items) == 0 {
311                 return err
312         }
313
314         for _, pod := range podList.Items {
315                 pods = append(pods, pod.Name)
316         }
317         cr.Status.CollectdAgents = pods
318         err = r.client.Status().Update(context.TODO(), cr)
319         return err
320 }
321
322 func (r *ReconcileCollectdPlugin) finalizeCollectdPlugin(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) error {
323         // Cleanup by regenerating new collectd conf and rolling update of DaemonSet
324         if err := r.handleCollectdPlugin(reqLogger, cr, true); err != nil {
325                 reqLogger.Error(err, "Finalize CollectdPlugin failed!!")
326                 return err
327         }
328         reqLogger.Info("Successfully finalized CollectdPlugin!!")
329         return nil
330 }
331
332 func (r *ReconcileCollectdPlugin) addFinalizer(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) error {
333         reqLogger.Info("Adding Finalizer for the CollectdPlugin")
334         cr.SetFinalizers(append(cr.GetFinalizers(), collectdPluginFinalizer))
335
336         // Update CR
337         err := r.client.Update(context.TODO(), cr)
338         if err != nil {
339                 reqLogger.Error(err, "Failed to update CollectdPlugin with finalizer")
340                 return err
341         }
342         return nil
343 }
344
345 func contains(list []string, s string) bool {
346         for _, v := range list {
347                 if v == s {
348                         return true
349                 }
350         }
351         return false
352 }
353
354 func remove(list []string, s string) []string {
355         for i, v := range list {
356                 if v == s {
357                         list = append(list[:i], list[i+1:]...)
358                 }
359         }
360         return list
361 }
362
363 // getWatchLabels returns the labels the operator should be watching for changes
364 func getWatchLabels() (string, error) {
365         labelSelector, found := os.LookupEnv(WatchLabelsEnvVar)
366         if !found {
367                 return defaultWatchLabel, fmt.Errorf("%s must be set", WatchLabelsEnvVar)
368         }
369         return labelSelector, nil
370 }