2 Copyright 2019 Intel Corporation.
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 http://www.apache.org/licenses/LICENSE-2.0
7 Unless required by applicable law or agreed to in writing, software
8 distributed under the License is distributed on an "AS IS" BASIS,
9 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 See the License for the specific language governing permissions and
11 limitations under the License.
14 package collectdplugin
22 "github.com/go-logr/logr"
23 "github.com/operator-framework/operator-sdk/pkg/predicate"
25 onapv1alpha1 "collectd-operator/pkg/apis/onap/v1alpha1"
26 collectdutils "collectd-operator/pkg/controller/utils"
27 dspredicate "collectd-operator/pkg/controller/utils"
29 appsv1 "k8s.io/api/apps/v1"
30 corev1 "k8s.io/api/core/v1"
31 "k8s.io/apimachinery/pkg/api/errors"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/types"
34 "k8s.io/client-go/util/retry"
35 "sigs.k8s.io/controller-runtime/pkg/client"
36 "sigs.k8s.io/controller-runtime/pkg/controller"
37 "sigs.k8s.io/controller-runtime/pkg/handler"
38 "sigs.k8s.io/controller-runtime/pkg/manager"
39 "sigs.k8s.io/controller-runtime/pkg/reconcile"
40 logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
41 "sigs.k8s.io/controller-runtime/pkg/source"
44 var log = logf.Log.WithName("controller_collectdplugin")
46 // Add creates a new CollectdPlugin Controller and adds it to the Manager. The Manager will set fields on the Controller
47 // and Start it when the Manager is Started.
48 func Add(mgr manager.Manager) error {
49 return add(mgr, newReconciler(mgr))
52 // newReconciler returns a new reconcile.Reconciler
53 func newReconciler(mgr manager.Manager) reconcile.Reconciler {
54 return &ReconcileCollectdPlugin{client: mgr.GetClient(), scheme: mgr.GetScheme()}
57 // add adds a new Controller to mgr with r as the reconcile.Reconciler
58 func add(mgr manager.Manager, r reconcile.Reconciler) error {
59 // Create a new controller
60 log.V(1).Info("Creating a new controller for CollectdPlugin")
61 c, err := controller.New("collectdplugin-controller", mgr, controller.Options{Reconciler: r})
66 // Watch for changes to primary resource CollectdPlugin
67 log.V(1).Info("Add watcher for primary resource CollectdPlugin")
68 err = c.Watch(&source.Kind{Type: &onapv1alpha1.CollectdPlugin{}}, &handler.EnqueueRequestForObject{}, predicate.GenerationChangedPredicate{})
73 log.V(1).Info("Add watcher for secondary resource Collectd Daemonset")
75 &source.Kind{Type: &appsv1.DaemonSet{}},
76 &handler.EnqueueRequestsFromMapFunc{
77 ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
78 labelSelector, err := collectdutils.GetWatchLabels()
79 labels := strings.Split(labelSelector, "=")
81 log.Error(err, "Failed to get watch labels, continuing with default label")
83 rcp := r.(*ReconcileCollectdPlugin)
84 // Select the Daemonset with labelSelector (Default is app=collectd)
85 if a.Meta.GetLabels()[labels[0]] == labels[1] {
86 var requests []reconcile.Request
87 cpList, err := collectdutils.GetCollectdPluginList(rcp.client, a.Meta.GetNamespace())
88 if err != nil || cpList == nil || cpList.Items == nil || len(cpList.Items) == 0 {
89 log.V(1).Info("No CollectdPlugin CR instance Exist")
92 for _, cp := range cpList.Items {
93 requests = append(requests, reconcile.Request{
94 NamespacedName: client.ObjectKey{Namespace: cp.Namespace, Name: cp.Name}})
100 }, dspredicate.DaemonSetStatusChangedPredicate{})
105 // blank assignment to verify that ReconcileCollectdPlugin implements reconcile.Reconciler
106 var _ reconcile.Reconciler = &ReconcileCollectdPlugin{}
108 // ReconcileCollectdPlugin reconciles a CollectdPlugin object
109 type ReconcileCollectdPlugin struct {
110 // This client, initialized using mgr.Client() above, is a split client
111 // that reads objects from the cache and writes to the apiserver
113 scheme *runtime.Scheme
116 // Reconcile reads that state of the cluster for a CollectdPlugin object and makes changes based on the state read
117 // and what is in the CollectdPlugin.Spec
118 // TODO(user): Modify this Reconcile function to implement your Controller logic. This example creates
119 // a Pod as an example
121 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
122 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
123 func (r *ReconcileCollectdPlugin) Reconcile(request reconcile.Request) (reconcile.Result, error) {
124 reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
125 reqLogger.Info("Reconciling CollectdPlugin")
127 // Fetch the CollectdPlugin instance
128 instance := &onapv1alpha1.CollectdPlugin{}
129 err := r.client.Get(context.TODO(), request.NamespacedName, instance)
131 if errors.IsNotFound(err) {
132 // Request object not found, could have been deleted after reconcile request.
133 // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
134 // Return and don't requeue
135 reqLogger.V(1).Info("CollectdPlugin object Not found")
136 return reconcile.Result{}, nil
138 // Error reading the object - requeue the request.
139 reqLogger.V(1).Info("Error reading the CollectdPlugin object, Requeuing")
140 return reconcile.Result{}, err
143 // Handle Delete CR for additional cleanup
144 isDelete, err := r.handleDelete(reqLogger, instance)
146 return reconcile.Result{}, err
149 // Add finalizer for this CR
150 if !collectdutils.Contains(instance.GetFinalizers(), collectdutils.CollectdFinalizer) {
151 if err := r.addFinalizer(reqLogger, instance); err != nil {
152 return reconcile.Result{}, err
154 //return reconcile.Result{}, nil
156 // Handle the reconciliation for CollectdPlugin.
157 // At this stage the Status of the CollectdPlugin should NOT be ""
158 err = r.handleCollectdPlugin(reqLogger, instance, false)
159 return reconcile.Result{}, err
162 // handleCollectdPlugin regenerates the collectd conf on CR Create, Update, Delete events
163 func (r *ReconcileCollectdPlugin) handleCollectdPlugin(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin, isDelete bool) error {
164 collectdutils.ReconcileLock.Lock()
165 defer collectdutils.ReconcileLock.Unlock()
166 retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
167 cm, err := collectdutils.GetConfigMap(r.client, reqLogger, cr.Namespace)
169 reqLogger.Error(err, "Skip reconcile: ConfigMap not found")
172 reqLogger.V(1).Info(":::: ConfigMap Info ::::", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
174 collectdConf, err := collectdutils.RebuildCollectdConf(r.client, cr.Namespace, isDelete, cr.Spec.PluginName)
176 reqLogger.Error(err, "Skip reconcile: Rebuild conf failed")
180 cm.SetAnnotations(map[string]string{
181 "daaas-random": collectdutils.ComputeSHA256([]byte(collectdConf)),
183 cm.Data["collectd.conf"] = collectdConf
185 // Update the ConfigMap with new Spec and reload DaemonSets
186 reqLogger.Info("Updating the ConfigMap", "ConfigMap.Namespace", cm.Namespace, "ConfigMap.Name", cm.Name)
187 updateErr := r.client.Update(context.TODO(), cm)
188 if updateErr != nil {
189 reqLogger.Error(updateErr, "Update ConfigMap failed")
193 // Retrieve the latest version of Daemonset before attempting update
194 // RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
195 // Select DaemonSets with label
196 dsList := &appsv1.DaemonSetList{}
197 opts := &client.ListOptions{}
198 labelSelector, err := collectdutils.GetWatchLabels()
200 reqLogger.Error(err, "Failed to get watch labels, continuing with default label")
202 opts.SetLabelSelector(labelSelector)
203 opts.InNamespace(cr.Namespace)
204 err = r.client.List(context.TODO(), opts, dsList)
206 panic(fmt.Errorf("Failed to get latest version of DaemonSet: %v", err))
209 if dsList.Items == nil || len(dsList.Items) == 0 {
210 return errors.NewNotFound(corev1.Resource("daemonset"), "DaemonSet")
212 ds := &dsList.Items[0]
213 //Restart Collectd Pods
214 reqLogger.Info("Reloading the Daemonset", "DaemonSet.Namespace", ds.Namespace, "DaemonSet.Name", ds.Name)
215 //Restart only if hash of conf has changed.
216 ds.Spec.Template.SetAnnotations(map[string]string{
217 "daaas-random": collectdutils.ComputeSHA256([]byte(collectdConf)),
219 updateErr = r.client.Update(context.TODO(), ds)
223 panic(fmt.Errorf("Update failed: %v", retryErr))
226 retryErr = retry.RetryOnConflict(retry.DefaultRetry, func() error {
227 err := r.updateStatus(cr)
232 panic(fmt.Errorf("Update failed: %v", retryErr))
235 reqLogger.Info("Reconcile success!!")
239 // Handle Delete CR event for additional cleanup
240 func (r *ReconcileCollectdPlugin) handleDelete(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) (bool, error) {
241 // Check if the CollectdPlugin instance is marked to be deleted, which is
242 // indicated by the deletion timestamp being set.
243 isMarkedToBeDeleted := cr.GetDeletionTimestamp() != nil
244 if isMarkedToBeDeleted {
245 // Update status to Deleting state
246 cr.Status.Status = onapv1alpha1.Deleting
247 cr.Status.CollectdAgents = nil
248 _ = r.client.Status().Update(context.TODO(), cr)
250 if collectdutils.Contains(cr.GetFinalizers(), collectdutils.CollectdFinalizer) {
251 // Run finalization logic for collectdPluginFinalizer. If the
252 // finalization logic fails, don't remove the finalizer so
253 // that we can retry during the next reconciliation.
254 if err := r.finalizeCollectdPlugin(reqLogger, cr); err != nil {
255 return isMarkedToBeDeleted, err
258 // Remove collectdPluginFinalizer. Once all finalizers have been
259 // removed, the object will be deleted.
260 cr.SetFinalizers(collectdutils.Remove(cr.GetFinalizers(), collectdutils.CollectdFinalizer))
261 err := r.client.Update(context.TODO(), cr)
263 return isMarkedToBeDeleted, err
267 return isMarkedToBeDeleted, nil
270 func (r *ReconcileCollectdPlugin) updateStatus(cr *onapv1alpha1.CollectdPlugin) error {
271 // Fetch the CollectdPlugin instance
272 instance := &onapv1alpha1.CollectdPlugin{}
273 key := types.NamespacedName{Namespace: cr.Namespace, Name: cr.Name}
274 err := r.client.Get(context.TODO(), key, instance)
278 switch instance.Status.Status {
279 case onapv1alpha1.Initial:
280 instance.Status.Status = onapv1alpha1.Created
281 case onapv1alpha1.Created, onapv1alpha1.Enabled:
282 pods, err := collectdutils.GetPodList(r.client, instance.Namespace)
286 if !reflect.DeepEqual(pods, instance.Status.CollectdAgents) {
287 instance.Status.CollectdAgents = pods
288 instance.Status.Status = onapv1alpha1.Enabled
290 case onapv1alpha1.Deleting, onapv1alpha1.Deprecated:
293 err = r.client.Status().Update(context.TODO(), instance)
297 func (r *ReconcileCollectdPlugin) finalizeCollectdPlugin(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) error {
298 // Cleanup by regenerating new collectd conf and rolling update of DaemonSet
299 if err := r.handleCollectdPlugin(reqLogger, cr, true); err != nil {
300 reqLogger.Error(err, "Finalize CollectdPlugin failed!!")
303 reqLogger.Info("Successfully finalized CollectdPlugin!!")
307 func (r *ReconcileCollectdPlugin) addFinalizer(reqLogger logr.Logger, cr *onapv1alpha1.CollectdPlugin) error {
308 reqLogger.Info("Adding Finalizer for the CollectdPlugin")
309 cr.SetFinalizers(append(cr.GetFinalizers(), collectdutils.CollectdFinalizer))
310 // Update status from Initial to Created
311 // Since addFinalizer will be executed only once,
312 // the status will be changed from Initial state to Created
313 // updateErr := r.updateStatus(cr)
314 // if updateErr != nil {
315 // reqLogger.Error(updateErr, "Failed to update status from Initial state")
318 cr.Status.Status = onapv1alpha1.Created
319 err := r.client.Update(context.TODO(), cr)
321 reqLogger.Error(err, "Failed to update CollectdPlugin with finalizer")