7 "github.com/go-logr/logr"
10 onapv1alpha1 "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1"
12 corev1 "k8s.io/api/core/v1"
13 extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
14 "k8s.io/apimachinery/pkg/api/errors"
15 "k8s.io/apimachinery/pkg/runtime"
16 "sigs.k8s.io/controller-runtime/pkg/client"
17 "sigs.k8s.io/controller-runtime/pkg/controller"
18 "sigs.k8s.io/controller-runtime/pkg/handler"
19 "sigs.k8s.io/controller-runtime/pkg/manager"
20 "sigs.k8s.io/controller-runtime/pkg/reconcile"
21 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
22 "sigs.k8s.io/controller-runtime/pkg/source"
25 var log = logf.Log.WithName("controller_collectdplugin")
27 // ResourceMap to hold objects to update/reload
28 type ResourceMap struct {
29 configMap *corev1.ConfigMap
30 daemonSet *extensionsv1beta1.DaemonSet
31 collectdPlugins *[]onapv1alpha1.CollectdPlugin
35 * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
36 * business logic. Delete these comments after modifying this file.*
39 // Add creates a new CollectdPlugin Controller and adds it to the Manager. The Manager will set fields on the Controller
40 // and Start it when the Manager is Started.
41 func Add(mgr manager.Manager) error {
42 return add(mgr, newReconciler(mgr))
45 // newReconciler returns a new reconcile.Reconciler
46 func newReconciler(mgr manager.Manager) reconcile.Reconciler {
47 return &ReconcileCollectdPlugin{client: mgr.GetClient(), scheme: mgr.GetScheme()}
50 // add adds a new Controller to mgr with r as the reconcile.Reconciler
51 func add(mgr manager.Manager, r reconcile.Reconciler) error {
52 // Create a new controller
53 log.V(1).Info("Creating a new controller for CollectdPlugin")
54 c, err := controller.New("collectdplugin-controller", mgr, controller.Options{Reconciler: r})
59 // Watch for changes to primary resource CollectdPlugin
60 log.V(1).Info("Add watcher for primary resource CollectdPlugin")
61 err = c.Watch(&source.Kind{Type: &onapv1alpha1.CollectdPlugin{}}, &handler.EnqueueRequestForObject{})
69 // blank assignment to verify that ReconcileCollectdPlugin implements reconcile.Reconciler
70 var _ reconcile.Reconciler = &ReconcileCollectdPlugin{}
72 // ReconcileCollectdPlugin reconciles a CollectdPlugin object
73 type ReconcileCollectdPlugin struct {
74 // This client, initialized using mgr.Client() above, is a split client
75 // that reads objects from the cache and writes to the apiserver
77 scheme *runtime.Scheme
80 // Define the collectdPlugin finalizer for handling deletion
82 defaultWatchLabel = "app=collectd"
83 collectdPluginFinalizer = "finalizer.collectdplugin.onap.org"
85 // WatchLabelsEnvVar is the constant for env variable WATCH_LABELS
86 // which is the labels where the watch activity happens.
87 // this value is empty if the operator is running with clusterScope.
88 WatchLabelsEnvVar = "WATCH_LABELS"
91 // Reconcile reads that state of the cluster for a CollectdPlugin object and makes changes based on the state read
92 // and what is in the CollectdPlugin.Spec
93 // TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates
94 // a Pod as an example
96 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
97 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
98 func (r *ReconcileCollectdPlugin) Reconcile(request reconcile.Request) (reconcile.Result, error) {
99 reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
100 reqLogger.Info("Reconciling CollectdPlugin")
102 // Fetch the CollectdPlugin instance
103 instance := &onapv1alpha1.CollectdPlugin{}
104 err := r.client.Get(context.TODO(), request.NamespacedName, instance)
106 if errors.IsNotFound(err) {
107 // Request object not found, could have been deleted after reconcile request.
108 // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
109 // Return and don't requeue
110 reqLogger.V(1).Info("CollectdPlugin object Not found")
111 return reconcile.Result{}, nil
113 // Error reading the object - requeue the request.
114 reqLogger.V(1).Info("Error reading the CollectdPlugin object, Requeuing")
115 return reconcile.Result{}, err
118 // Handle Delete CR for additional cleanup
119 isDelete, err := r.handleDelete(reqLogger, instance)
121 return reconcile.Result{}, err
124 // Add finalizer for this CR
125 if !contains(instance.GetFinalizers(), collectdPluginFinalizer) {
126 if err := r.addFinalizer(reqLogger, instance); err != nil {
127 return reconcile.Result{}, err
129 return reconcile.Result{}, nil
131 err = r.handleCollectdPlugin(reqLogger, instance, false)
132 return reconcile.Result{}, err
135 // handleCollectdPlugin regenerates the collectd conf on CR Create, Update, Delete events
136 func (r *ReconcileCollectdPlugin) handleCollectdPlugin(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin, isDelete bool) error {
138 rmap, err := r.findResourceMapForCR(reqLogger, cr)
140 reqLogger.Error(err, "Skip reconcile: Resources not found")
146 collectPlugins := rmap.collectdPlugins
147 reqLogger.V(1).Info("Found ResourceMap")
148 reqLogger.V(1).Info(":::: ConfigMap Info ::::", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
149 reqLogger.V(1).Info(":::: DaemonSet Info ::::", "DaemonSet.Namespace", ds.Namespace, "DaemonSet.Name", ds.Name)
151 collectdConf, err := rebuildCollectdConf(cr, collectPlugins, isDelete)
153 //Restart Collectd Pods
154 //Restart only if hash of configmap has changed.
155 ds.Spec.Template.SetAnnotations(map[string]string{
156 "daaas-random": ComputeSHA256([]byte(collectdConf)),
158 cm.SetAnnotations(map[string]string{
159 "daaas-random": ComputeSHA256([]byte(collectdConf)),
162 cm.Data["node-collectd.conf"] = collectdConf
164 // Update the ConfigMap with new Spec and reload DaemonSets
165 reqLogger.Info("Updating the ConfigMap", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
166 log.V(1).Info("ConfigMap Data", "Map: ", cm.Data)
167 err = r.client.Update(context.TODO(), cm)
169 reqLogger.Error(err, "Update the ConfigMap failed", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
173 reqLogger.Info("Reloading the Daemonset", "DaemonSet.Namespace", ds.Namespace, "DaemonSet.Name", ds.Name)
174 err = r.client.Update(context.TODO(), ds)
176 reqLogger.Error(err, "Update the DaemonSet failed", "DaemonSet.Namespace", ds.Namespace, "DaemonSet.Name", ds.Name)
181 reqLogger.Info("Reconcile success!!")
185 // ComputeSHA256 returns hash of data as string
186 func ComputeSHA256(data []byte) string {
187 hash := sha256.Sum256(data)
188 return fmt.Sprintf("%x", hash)
191 // findResourceMapForCR returns the configMap, collectd Daemonset and list of Collectd Plugins
192 func (r *ReconcileCollectdPlugin) findResourceMapForCR(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) (ResourceMap, error) {
193 cmList := &corev1.ConfigMapList{}
194 opts := &client.ListOptions{}
195 rmap := ResourceMap{}
197 // Select ConfigMaps with label
198 labelSelector, err := getWatchLabels()
200 reqLogger.Error(err, "Failed to get watch labels, continuing with default label")
202 opts.SetLabelSelector(labelSelector)
203 opts.InNamespace(cr.Namespace)
205 err = r.client.List(context.TODO(), opts, cmList)
210 if cmList.Items == nil || len(cmList.Items) == 0 {
211 return rmap, errors.NewNotFound(corev1.Resource("configmap"), "ConfigMap")
214 // Select DaemonSets with label
215 dsList := &extensionsv1beta1.DaemonSetList{}
216 err = r.client.List(context.TODO(), opts, dsList)
221 if dsList.Items == nil || len(dsList.Items) == 0 {
222 return rmap, errors.NewNotFound(corev1.Resource("daemonset"), "DaemonSet")
225 // Get all collectd plugins in the current namespace to rebuild conf.
226 collectdPlugins := &onapv1alpha1.CollectdPluginList{}
227 cpOpts := &client.ListOptions{}
228 cpOpts.InNamespace(cr.Namespace)
229 err = r.client.List(context.TODO(), cpOpts, collectdPlugins)
234 rmap.configMap = &cmList.Items[0]
235 rmap.daemonSet = &dsList.Items[0]
236 rmap.collectdPlugins = &collectdPlugins.Items //will be nil if no plugins exist
240 // Get all collectd plugins and reconstruct, compute Hash and check for changes
241 func rebuildCollectdConf(cr *onapv1alpha1.CollectdPlugin, cpList *[]onapv1alpha1.CollectdPlugin, isDelete bool) (string, error) {
242 var collectdConf string
243 if *cpList == nil || len(*cpList) == 0 {
244 return "", errors.NewNotFound(corev1.Resource("collectdplugin"), "CollectdPlugin")
246 loadPlugin := make(map[string]string)
247 for _, cp := range *cpList {
248 if cp.Spec.PluginName == "global" {
249 collectdConf += cp.Spec.PluginConf + "\n"
251 loadPlugin[cp.Spec.PluginName] = cp.Spec.PluginConf
256 delete(loadPlugin, cr.Spec.PluginName)
259 log.V(1).Info("::::::: Plugins Map ::::::: ", "PluginMap ", loadPlugin)
261 for cpName, cpConf := range loadPlugin {
262 collectdConf += "LoadPlugin" + " " + cpName + "\n"
263 collectdConf += cpConf + "\n"
266 collectdConf += "#Last line (collectd requires ā\\nā at the last line)\n"
268 return collectdConf, nil
271 // Handle Delete CR event for additional cleanup
272 func (r *ReconcileCollectdPlugin) handleDelete(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) (bool, error) {
273 // Check if the CollectdPlugin instance is marked to be deleted, which is
274 // indicated by the deletion timestamp being set.
275 isMarkedToBeDeleted := cr.GetDeletionTimestamp() != nil
276 if isMarkedToBeDeleted {
277 if contains(cr.GetFinalizers(), collectdPluginFinalizer) {
278 // Run finalization logic for collectdPluginFinalizer. If the
279 // finalization logic fails, don't remove the finalizer so
280 // that we can retry during the next reconciliation.
281 if err := r.finalizeCollectdPlugin(reqLogger, cr); err != nil {
282 return isMarkedToBeDeleted, err
285 // Remove collectdPluginFinalizer. Once all finalizers have been
286 // removed, the object will be deleted.
287 cr.SetFinalizers(remove(cr.GetFinalizers(), collectdPluginFinalizer))
288 err := r.client.Update(context.TODO(), cr)
290 return isMarkedToBeDeleted, err
294 return isMarkedToBeDeleted, nil
297 func (r *ReconcileCollectdPlugin) updateStatus(cr *onapv1alpha1.CollectdPlugin) error {
298 podList := &corev1.PodList{}
299 opts := &client.ListOptions{}
300 // Select ConfigMaps with label
301 labelSelector, _ := getWatchLabels()
302 opts.SetLabelSelector(labelSelector)
304 opts.InNamespace(cr.Namespace)
305 err := r.client.List(context.TODO(), opts, podList)
310 if podList.Items == nil || len(podList.Items) == 0 {
314 for _, pod := range podList.Items {
315 pods = append(pods, pod.Name)
317 cr.Status.CollectdAgents = pods
318 err = r.client.Status().Update(context.TODO(), cr)
322 func (r *ReconcileCollectdPlugin) finalizeCollectdPlugin(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) error {
323 // Cleanup by regenerating new collectd conf and rolling update of DaemonSet
324 if err := r.handleCollectdPlugin(reqLogger, cr, true); err != nil {
325 reqLogger.Error(err, "Finalize CollectdPlugin failed!!")
328 reqLogger.Info("Successfully finalized CollectdPlugin!!")
332 func (r *ReconcileCollectdPlugin) addFinalizer(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) error {
333 reqLogger.Info("Adding Finalizer for the CollectdPlugin")
334 cr.SetFinalizers(append(cr.GetFinalizers(), collectdPluginFinalizer))
337 err := r.client.Update(context.TODO(), cr)
339 reqLogger.Error(err, "Failed to update CollectdPlugin with finalizer")
345 func contains(list []string, s string) bool {
346 for _, v := range list {
354 func remove(list []string, s string) []string {
355 for i, v := range list {
357 list = append(list[:i], list[i+1:]...)
363 // getWatchLabels returns the labels the operator should be watching for changes
364 func getWatchLabels() (string, error) {
365 labelSelector, found := os.LookupEnv(WatchLabelsEnvVar)
367 return defaultWatchLabel, fmt.Errorf("%s must be set", WatchLabelsEnvVar)
369 return labelSelector, nil