2 Copyright 2019 Intel Corporation.
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 http://www.apache.org/licenses/LICENSE-2.0
7 Unless required by applicable law or agreed to in writing, software
8 distributed under the License is distributed on an "AS IS" BASIS,
9 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 See the License for the specific language governing permissions and
11 limitations under the License.
14 package collectdglobal
23 "github.com/go-logr/logr"
24 "github.com/operator-framework/operator-sdk/pkg/predicate"
26 onapv1alpha1 "collectd-operator/pkg/apis/onap/v1alpha1"
27 collectdutils "collectd-operator/pkg/controller/utils"
28 dspredicate "collectd-operator/pkg/controller/utils"
29 dsutils "collectd-operator/pkg/controller/utils"
31 appsv1 "k8s.io/api/apps/v1"
32 corev1 "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/api/errors"
34 "k8s.io/apimachinery/pkg/runtime"
35 "k8s.io/apimachinery/pkg/types"
36 "k8s.io/client-go/util/retry"
37 "sigs.k8s.io/controller-runtime/pkg/client"
38 "sigs.k8s.io/controller-runtime/pkg/controller"
39 "sigs.k8s.io/controller-runtime/pkg/handler"
40 "sigs.k8s.io/controller-runtime/pkg/manager"
41 "sigs.k8s.io/controller-runtime/pkg/reconcile"
42 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
43 "sigs.k8s.io/controller-runtime/pkg/source"
46 var log = logf.Log.WithName("controller_collectdglobal")
48 // Add creates a new CollectdGlobal Controller and adds it to the Manager. The Manager will set fields on the Controller
49 // and Start it when the Manager is Started.
50 func Add(mgr manager.Manager) error {
51 return add(mgr, newReconciler(mgr))
54 // newReconciler returns a new reconcile.Reconciler
55 func newReconciler(mgr manager.Manager) reconcile.Reconciler {
56 return &ReconcileCollectdGlobal{client: mgr.GetClient(), scheme: mgr.GetScheme()}
59 // add adds a new Controller to mgr with r as the reconcile.Reconciler
60 func add(mgr manager.Manager, r reconcile.Reconciler) error {
61 // Create a new controller
62 log.V(1).Info("Creating a new controller for CollectdGlobal")
63 c, err := controller.New("collectdglobal-controller", mgr, controller.Options{Reconciler: r})
68 // Watch for changes to primary resource CollectdGlobal
69 log.V(1).Info("Add watcher for primary resource CollectdGlobal")
70 err = c.Watch(&source.Kind{Type: &onapv1alpha1.CollectdGlobal{}}, &handler.EnqueueRequestForObject{}, predicate.GenerationChangedPredicate{})
75 log.V(1).Info("Add watcher for secondary resource Collectd Daemonset")
77 &source.Kind{Type: &appsv1.DaemonSet{}},
78 &handler.EnqueueRequestsFromMapFunc{
79 ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
80 labelSelector, err := collectdutils.GetWatchLabels()
81 labels := strings.Split(labelSelector, "=")
83 log.Error(err, "Failed to get watch labels, continuing with default label")
85 rcp := r.(*ReconcileCollectdGlobal)
86 // Select the Daemonset with labelSelector (Default is app=collectd)
87 if a.Meta.GetLabels()[labels[0]] == labels[1] {
88 var requests []reconcile.Request
89 cg, err := collectdutils.GetCollectdGlobal(rcp.client, a.Meta.GetNamespace())
90 if err != nil || cg == nil {
91 log.V(1).Info("No CollectdGlobal CR instance Exist")
94 requests = append(requests, reconcile.Request{
95 NamespacedName: client.ObjectKey{Namespace: cg.Namespace, Name: cg.Name}})
100 }, dspredicate.DaemonSetStatusChangedPredicate{})
108 // blank assignment to verify that ReconcileCollectdGlobal implements reconcile.Reconciler
109 var _ reconcile.Reconciler = &ReconcileCollectdGlobal{}
111 // ReconcileCollectdGlobal reconciles a CollectdGlobal object
112 type ReconcileCollectdGlobal struct {
113 // This client, initialized using mgr.Client() above, is a split client
114 // that reads objects from the cache and writes to the apiserver
116 scheme *runtime.Scheme
119 // Reconcile reads that state of the cluster for a CollectdGlobal object and makes changes based on the state read
120 // and what is in the CollectdGlobal.Spec
121 // TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates
122 // a Pod as an example
124 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
125 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
126 func (r *ReconcileCollectdGlobal) Reconcile(request reconcile.Request) (reconcile.Result, error) {
127 reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
128 reqLogger.Info("Reconciling CollectdGlobal")
130 // Fetch the CollectdGlobal instance
131 instance := &onapv1alpha1.CollectdGlobal{}
132 err := r.client.Get(context.TODO(), request.NamespacedName, instance)
134 if errors.IsNotFound(err) {
135 // Request object not found, could have been deleted after reconcile request.
136 // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
137 // Return and don't requeue
138 reqLogger.V(1).Info("CollectdGlobal object Not found")
139 return reconcile.Result{}, nil
141 // Error reading the object - requeue the request.
142 reqLogger.V(1).Info("Error reading the CollectdGlobal object, Requeuing")
143 return reconcile.Result{}, err
146 // Handle Delete CR for additional cleanup
147 isDelete, err := r.handleDelete(reqLogger, instance)
149 return reconcile.Result{}, err
152 // Add finalizer for this CR
153 if !collectdutils.Contains(instance.GetFinalizers(), collectdutils.CollectdFinalizer) {
154 if err := r.addFinalizer(reqLogger, instance); err != nil {
155 return reconcile.Result{}, err
157 //return reconcile.Result{}, nil
159 // Handle the reconciliation for CollectdGlobal.
160 // At this stage the Status of the CollectdGlobal should NOT be ""
161 err = r.handleCollectdGlobal(reqLogger, instance, false)
162 return reconcile.Result{}, err
165 // handleCollectdGlobal regenerates the collectd conf on CR Create, Update, Delete events
166 func (r *ReconcileCollectdGlobal) handleCollectdGlobal(reqLogger logr.Logger, cr *onapv1alpha1.CollectdGlobal, isDelete bool) error {
167 collectdutils.ReconcileLock.Lock()
168 defer collectdutils.ReconcileLock.Unlock()
170 retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
171 cm, err := collectdutils.GetConfigMap(r.client, reqLogger, cr.Namespace)
173 reqLogger.Info(":::: Skip current reconcile:::: ConfigMap not found. Cache might be stale. Requeue")
177 reqLogger.V(1).Info(":::: ConfigMap Info ::::", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
179 collectdConf, err := collectdutils.RebuildCollectdConf(r.client, cr.Namespace, isDelete, "")
181 reqLogger.Error(err, "Skip reconcile: Rebuild conf failed")
185 cm.SetAnnotations(map[string]string{
186 "daaas-random": collectdutils.ComputeSHA256([]byte(collectdConf)),
189 cm.Data["collectd.conf"] = collectdConf
190 // Update the ConfigMap with new Spec and reload DaemonSets
191 reqLogger.Info("Updating the ConfigMap", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
192 updateErr := r.client.Update(context.TODO(), cm)
193 if updateErr != nil {
194 reqLogger.Error(updateErr, "Update ConfigMap failed")
198 // Retrieve the latest version of Daemonset before attempting update
199 // RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
200 // Select DaemonSets with label
201 dsList := &appsv1.DaemonSetList{}
202 opts := &client.ListOptions{}
203 labelSelector, err := collectdutils.GetWatchLabels()
205 reqLogger.Error(err, "Failed to get watch labels, continuing with default label")
207 opts.SetLabelSelector(labelSelector)
208 opts.InNamespace(cr.Namespace)
209 err = r.client.List(context.TODO(), opts, dsList)
211 panic(fmt.Errorf("Failed to get latest version of DaemonSet: %v", err))
214 if dsList.Items == nil || len(dsList.Items) == 0 {
215 return errors.NewNotFound(corev1.Resource("daemonset"), "DaemonSet")
217 ds := &dsList.Items[0]
218 //Restart Collectd Pods
219 reqLogger.Info("Reloading the Daemonset", "DaemonSet.Namespace", ds.Namespace, "DaemonSet.Name", ds.Name)
220 //Restart only if hash of conf has changed.
221 ds.Spec.Template.SetAnnotations(map[string]string{
222 "daaas-random": collectdutils.ComputeSHA256([]byte(collectdConf)),
224 r.handleTypesDB(reqLogger, cr, ds, isDelete)
225 updateErr = r.client.Update(context.TODO(), ds)
229 panic(fmt.Errorf("Update failed: %v", retryErr))
232 retryErr = retry.RetryOnConflict(retry.DefaultRetry, func() error {
233 err := r.updateStatus(cr)
238 panic(fmt.Errorf("Update failed: %v", retryErr))
241 reqLogger.Info("Reconcile success!!")
245 // Handle Delete CR event for additional cleanup
246 func (r *ReconcileCollectdGlobal) handleDelete(reqLogger logr.Logger, cr *onapv1alpha1.CollectdGlobal) (bool, error) {
247 // Check if the CollectdGlobal instance is marked to be deleted, which is
248 // indicated by the deletion timestamp being set.
249 isMarkedToBeDeleted := cr.GetDeletionTimestamp() != nil
250 if isMarkedToBeDeleted {
251 // Update status to Deleting state
252 cr.Status.Status = onapv1alpha1.Deleting
253 cr.Status.CollectdAgents = nil
254 _ = r.client.Status().Update(context.TODO(), cr)
256 if collectdutils.Contains(cr.GetFinalizers(), collectdutils.CollectdFinalizer) {
257 // Run finalization logic for CollectdFinalizer. If the
258 // finalization logic fails, don't remove the finalizer so
259 // that we can retry during the next reconciliation.
260 if err := r.finalizeCollectdGlobal(reqLogger, cr); err != nil {
261 return isMarkedToBeDeleted, err
264 // Remove CollectdFinalizer. Once all finalizers have been
265 // removed, the object will be deleted.
266 cr.SetFinalizers(collectdutils.Remove(cr.GetFinalizers(), collectdutils.CollectdFinalizer))
267 err := r.client.Update(context.TODO(), cr)
269 return isMarkedToBeDeleted, err
273 return isMarkedToBeDeleted, nil
276 func (r *ReconcileCollectdGlobal) updateStatus(cr *onapv1alpha1.CollectdGlobal) error {
277 // Fetch the CollectdGlobal instance
278 instance := &onapv1alpha1.CollectdGlobal{}
279 key := types.NamespacedName{Namespace: cr.Namespace, Name: cr.Name}
280 err := r.client.Get(context.TODO(), key, instance)
284 switch instance.Status.Status {
285 case onapv1alpha1.Initial:
286 instance.Status.Status = onapv1alpha1.Created
287 case onapv1alpha1.Created, onapv1alpha1.Enabled:
288 pods, err := collectdutils.GetPodList(r.client, instance.Namespace)
292 if !reflect.DeepEqual(pods, instance.Status.CollectdAgents) {
293 instance.Status.CollectdAgents = pods
294 instance.Status.Status = onapv1alpha1.Enabled
296 case onapv1alpha1.Deleting, onapv1alpha1.Deprecated:
299 err = r.client.Status().Update(context.TODO(), instance)
303 func (r *ReconcileCollectdGlobal) finalizeCollectdGlobal(reqLogger logr.Logger, cr *onapv1alpha1.CollectdGlobal) error {
304 // Cleanup by regenerating new collectd conf and rolling update of DaemonSet
305 if err := r.handleCollectdGlobal(reqLogger, cr, true); err != nil {
306 reqLogger.Error(err, "Finalize CollectdGlobal failed!!")
309 reqLogger.Info("Successfully finalized CollectdGlobal!!")
313 func (r *ReconcileCollectdGlobal) addFinalizer(reqLogger logr.Logger, cr *onapv1alpha1.CollectdGlobal) error {
314 reqLogger.Info("Adding Finalizer for the CollectdGlobal")
315 cr.SetFinalizers(append(cr.GetFinalizers(), collectdutils.CollectdFinalizer))
316 // Update status from Initial to Created
317 // Since addFinalizer will be executed only once,
318 // the status will be changed from Initial state to Created
319 // updateErr := r.updateStatus(cr)
320 // if updateErr != nil {
321 // reqLogger.Error(updateErr, "Failed to update status from Initial state")
324 cr.Status.Status = onapv1alpha1.Created
325 err := r.client.Update(context.TODO(), cr)
327 reqLogger.Error(err, "Failed to update CollectdGlobal with finalizer")
333 func (r *ReconcileCollectdGlobal) handleTypesDB(reqLogger logr.Logger, cr *onapv1alpha1.CollectdGlobal, ds *appsv1.DaemonSet, isDelete bool) error {
334 if isDelete || cr.Spec.ConfigMap == "" {
335 dsutils.RemoveTypesDB(ds)
339 cm := &corev1.ConfigMap{}
340 key := types.NamespacedName{Namespace: cr.Namespace, Name: cr.Spec.ConfigMap}
341 err := r.client.Get(context.TODO(), key, cm)
343 reqLogger.Info("Error getting TypesDB")
346 dsutils.UpsertTypesDB(ds, cm, cr)