Added Daemonset Status predicate 51/96251/3
authorDileep Ranganathan <dileep.ranganathan@intel.com>
Wed, 25 Sep 2019 21:37:41 +0000 (14:37 -0700)
committerMarco Platania <platania@research.att.com>
Tue, 1 Oct 2019 12:47:15 +0000 (12:47 +0000)
Added Daemonset Status predicate to optimize watch on DS.
Minor fixes on status update for the controllers by coalescing status
and CR update together.

Issue-ID: ONAPARC-461
Signed-off-by: Dileep Ranganathan <dileep.ranganathan@intel.com>
Change-Id: I2a56f0b93c2d7a56b9e8149c41f8c6f22be86ef1

vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1/zz_generated.openapi.go
vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdglobal/collectdglobal_controller.go
vnfs/DAaaS/microservices/collectd-operator/pkg/controller/collectdplugin/collectdplugin_controller.go
vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/collectdutils.go
vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/predicate.go [new file with mode: 0644]

index 0c80396..85e49f4 100644 (file)
@@ -11,12 +11,12 @@ import (
 
 func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
        return map[string]common.OpenAPIDefinition{
-               "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobal":       schema_pkg_apis_onap_v1alpha1_CollectdGlobal(ref),
-               "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalSpec":   schema_pkg_apis_onap_v1alpha1_CollectdGlobalSpec(ref),
-               "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalStatus": schema_pkg_apis_onap_v1alpha1_CollectdGlobalStatus(ref),
-               "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPlugin":       schema_pkg_apis_onap_v1alpha1_CollectdPlugin(ref),
-               "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginSpec":   schema_pkg_apis_onap_v1alpha1_CollectdPluginSpec(ref),
-               "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginStatus": schema_pkg_apis_onap_v1alpha1_CollectdPluginStatus(ref),
+               "./pkg/apis/onap/v1alpha1.CollectdGlobal":       schema_pkg_apis_onap_v1alpha1_CollectdGlobal(ref),
+               "./pkg/apis/onap/v1alpha1.CollectdGlobalSpec":   schema_pkg_apis_onap_v1alpha1_CollectdGlobalSpec(ref),
+               "./pkg/apis/onap/v1alpha1.CollectdGlobalStatus": schema_pkg_apis_onap_v1alpha1_CollectdGlobalStatus(ref),
+               "./pkg/apis/onap/v1alpha1.CollectdPlugin":       schema_pkg_apis_onap_v1alpha1_CollectdPlugin(ref),
+               "./pkg/apis/onap/v1alpha1.CollectdPluginSpec":   schema_pkg_apis_onap_v1alpha1_CollectdPluginSpec(ref),
+               "./pkg/apis/onap/v1alpha1.CollectdPluginStatus": schema_pkg_apis_onap_v1alpha1_CollectdPluginStatus(ref),
        }
 }
 
@@ -47,19 +47,19 @@ func schema_pkg_apis_onap_v1alpha1_CollectdGlobal(ref common.ReferenceCallback)
                                        },
                                        "spec": {
                                                SchemaProps: spec.SchemaProps{
-                                                       Ref: ref("demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalSpec"),
+                                                       Ref: ref("./pkg/apis/onap/v1alpha1.CollectdGlobalSpec"),
                                                },
                                        },
                                        "status": {
                                                SchemaProps: spec.SchemaProps{
-                                                       Ref: ref("demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalStatus"),
+                                                       Ref: ref("./pkg/apis/onap/v1alpha1.CollectdGlobalStatus"),
                                                },
                                        },
                                },
                        },
                },
                Dependencies: []string{
-                       "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalSpec", "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdGlobalStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
+                       "./pkg/apis/onap/v1alpha1.CollectdGlobalSpec", "./pkg/apis/onap/v1alpha1.CollectdGlobalStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
        }
 }
 
@@ -151,19 +151,19 @@ func schema_pkg_apis_onap_v1alpha1_CollectdPlugin(ref common.ReferenceCallback)
                                        },
                                        "spec": {
                                                SchemaProps: spec.SchemaProps{
-                                                       Ref: ref("demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginSpec"),
+                                                       Ref: ref("./pkg/apis/onap/v1alpha1.CollectdPluginSpec"),
                                                },
                                        },
                                        "status": {
                                                SchemaProps: spec.SchemaProps{
-                                                       Ref: ref("demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginStatus"),
+                                                       Ref: ref("./pkg/apis/onap/v1alpha1.CollectdPluginStatus"),
                                                },
                                        },
                                },
                        },
                },
                Dependencies: []string{
-                       "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginSpec", "demo/vnfs/DAaaS/microservices/collectd-operator/pkg/apis/onap/v1alpha1.CollectdPluginStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
+                       "./pkg/apis/onap/v1alpha1.CollectdPluginSpec", "./pkg/apis/onap/v1alpha1.CollectdPluginStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"},
        }
 }
 
index 0d3e2bb..8c6023b 100644 (file)
@@ -12,6 +12,7 @@ import (
 
        onapv1alpha1 "collectd-operator/pkg/apis/onap/v1alpha1"
        collectdutils "collectd-operator/pkg/controller/utils"
+       dspredicate "collectd-operator/pkg/controller/utils"
        dsutils "collectd-operator/pkg/controller/utils"
 
        appsv1 "k8s.io/api/apps/v1"
@@ -69,22 +70,13 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
                                        log.Error(err, "Failed to get watch labels, continuing with default label")
                                }
                                rcp := r.(*ReconcileCollectdGlobal)
-                               // Select the Daemonset with labelSelector (Defautl  is app=collectd)
+                               // Select the Daemonset with labelSelector (Default  is app=collectd)
                                if a.Meta.GetLabels()[labels[0]] == labels[1] {
                                        var requests []reconcile.Request
                                        cg, err := collectdutils.GetCollectdGlobal(rcp.client, a.Meta.GetNamespace())
                                        if err != nil || cg == nil {
                                                log.V(1).Info("No CollectdGlobal CR instance Exist")
-                                               cpList, err := collectdutils.GetCollectdPluginList(rcp.client, a.Meta.GetNamespace())
-                                               if err != nil || cpList == nil || cpList.Items == nil || len(cpList.Items) == 0 {
-                                                       log.V(1).Info("No CollectdPlugin CR instance Exist")
-                                                       return nil
-                                               }
-                                               for _, cp := range cpList.Items {
-                                                       requests = append(requests, reconcile.Request{
-                                                               NamespacedName: client.ObjectKey{Namespace: cp.Namespace, Name: cp.Name}})
-                                               }
-                                               return requests
+                                               return nil
                                        }
                                        requests = append(requests, reconcile.Request{
                                                NamespacedName: client.ObjectKey{Namespace: cg.Namespace, Name: cg.Name}})
@@ -92,7 +84,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
                                }
                                return nil
                        }),
-               }, predicate.GenerationChangedPredicate{})
+               }, dspredicate.DaemonSetStatusChangedPredicate{})
        if err != nil {
                return err
        }
@@ -224,10 +216,13 @@ func (r *ReconcileCollectdGlobal) handleCollectdGlobal(reqLogger logr.Logger, cr
                panic(fmt.Errorf("Update failed: %v", retryErr))
        }
 
-       err := r.updateStatus(cr)
-       if err != nil {
-               reqLogger.Error(err, "Unable to update status")
+       retryErr = retry.RetryOnConflict(retry.DefaultRetry, func() error {
+               err := r.updateStatus(cr)
                return err
+       })
+
+       if retryErr != nil {
+               panic(fmt.Errorf("Update failed: %v", retryErr))
        }
        // Reconcile success
        reqLogger.Info("Reconcile success!!")
@@ -266,22 +261,29 @@ func (r *ReconcileCollectdGlobal) handleDelete(reqLogger logr.Logger, cr *onapv1
 }
 
 func (r *ReconcileCollectdGlobal) updateStatus(cr *onapv1alpha1.CollectdGlobal) error {
-       switch cr.Status.Status {
+       // Fetch the CollectdGlobal instance
+       instance := &onapv1alpha1.CollectdGlobal{}
+       key := types.NamespacedName{Namespace: cr.Namespace, Name: cr.Name}
+       err := r.client.Get(context.TODO(), key, instance)
+       if err != nil {
+               return err
+       }
+       switch instance.Status.Status {
        case onapv1alpha1.Initial:
-               cr.Status.Status = onapv1alpha1.Created
+               instance.Status.Status = onapv1alpha1.Created
        case onapv1alpha1.Created, onapv1alpha1.Enabled:
-               pods, err := collectdutils.GetPodList(r.client, cr.Namespace)
+               pods, err := collectdutils.GetPodList(r.client, instance.Namespace)
                if err != nil {
                        return err
                }
-               if !reflect.DeepEqual(pods, cr.Status.CollectdAgents) {
-                       cr.Status.CollectdAgents = pods
-                       cr.Status.Status = onapv1alpha1.Enabled
+               if !reflect.DeepEqual(pods, instance.Status.CollectdAgents) {
+                       instance.Status.CollectdAgents = pods
+                       instance.Status.Status = onapv1alpha1.Enabled
                }
        case onapv1alpha1.Deleting, onapv1alpha1.Deprecated:
                return nil
        }
-       err := r.client.Status().Update(context.TODO(), cr)
+       err = r.client.Status().Update(context.TODO(), instance)
        return err
 }
 
@@ -301,11 +303,12 @@ func (r *ReconcileCollectdGlobal) addFinalizer(reqLogger logr.Logger, cr *onapv1
        // Update status from Initial to Created
        // Since addFinalizer will be executed only once,
        // the status will be changed from Initial state to Created
-       updateErr := r.updateStatus(cr)
-       if updateErr != nil {
-               reqLogger.Error(updateErr, "Failed to update status from Initial state")
-       }
+       // updateErr := r.updateStatus(cr)
+       // if updateErr != nil {
+       //      reqLogger.Error(updateErr, "Failed to update status from Initial state")
+       // }
        // Update CR
+       cr.Status.Status = onapv1alpha1.Created
        err := r.client.Update(context.TODO(), cr)
        if err != nil {
                reqLogger.Error(err, "Failed to update CollectdGlobal with finalizer")
index 32775c5..e0e62b8 100644 (file)
@@ -4,17 +4,20 @@ import (
        "context"
        "fmt"
        "reflect"
+       "strings"
 
        "github.com/go-logr/logr"
        "github.com/operator-framework/operator-sdk/pkg/predicate"
 
        onapv1alpha1 "collectd-operator/pkg/apis/onap/v1alpha1"
        collectdutils "collectd-operator/pkg/controller/utils"
+       dspredicate "collectd-operator/pkg/controller/utils"
 
        appsv1 "k8s.io/api/apps/v1"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/util/retry"
        "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/controller"
@@ -54,6 +57,35 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
                return err
        }
 
+       log.V(1).Info("Add watcher for secondary resource Collectd Daemonset")
+       err = c.Watch(
+               &source.Kind{Type: &appsv1.DaemonSet{}},
+               &handler.EnqueueRequestsFromMapFunc{
+                       ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
+                               labelSelector, err := collectdutils.GetWatchLabels()
+                               labels := strings.Split(labelSelector, "=")
+                               if err != nil {
+                                       log.Error(err, "Failed to get watch labels, continuing with default label")
+                               }
+                               rcp := r.(*ReconcileCollectdPlugin)
+                               // Select the Daemonset with labelSelector (Default  is app=collectd)
+                               if a.Meta.GetLabels()[labels[0]] == labels[1] {
+                                       var requests []reconcile.Request
+                                       cpList, err := collectdutils.GetCollectdPluginList(rcp.client, a.Meta.GetNamespace())
+                                       if err != nil || cpList == nil || cpList.Items == nil || len(cpList.Items) == 0 {
+                                               log.V(1).Info("No CollectdPlugin CR instance Exist")
+                                               return nil
+                                       }
+                                       for _, cp := range cpList.Items {
+                                               requests = append(requests, reconcile.Request{
+                                                       NamespacedName: client.ObjectKey{Namespace: cp.Namespace, Name: cp.Name}})
+                                       }
+                                       return requests
+                               }
+                               return nil
+                       }),
+               }, dspredicate.DaemonSetStatusChangedPredicate{})
+
        return nil
 }
 
@@ -178,10 +210,13 @@ func (r *ReconcileCollectdPlugin) handleCollectdPlugin(reqLogger logr.Logger, cr
                panic(fmt.Errorf("Update failed: %v", retryErr))
        }
 
-       err := r.updateStatus(cr)
-       if err != nil {
-               reqLogger.Error(err, "Unable to update status")
+       retryErr = retry.RetryOnConflict(retry.DefaultRetry, func() error {
+               err := r.updateStatus(cr)
                return err
+       })
+
+       if retryErr != nil {
+               panic(fmt.Errorf("Update failed: %v", retryErr))
        }
        // Reconcile success
        reqLogger.Info("Reconcile success!!")
@@ -220,22 +255,29 @@ func (r *ReconcileCollectdPlugin) handleDelete(reqLogger logr.Logger, cr *onapv1
 }
 
 func (r *ReconcileCollectdPlugin) updateStatus(cr *onapv1alpha1.CollectdPlugin) error {
-       switch cr.Status.Status {
+       // Fetch the CollectdGlobal instance
+       instance := &onapv1alpha1.CollectdPlugin{}
+       key := types.NamespacedName{Namespace: cr.Namespace, Name: cr.Name}
+       err := r.client.Get(context.TODO(), key, instance)
+       if err != nil {
+               return err
+       }
+       switch instance.Status.Status {
        case onapv1alpha1.Initial:
-               cr.Status.Status = onapv1alpha1.Created
+               instance.Status.Status = onapv1alpha1.Created
        case onapv1alpha1.Created, onapv1alpha1.Enabled:
-               pods, err := collectdutils.GetPodList(r.client, cr.Namespace)
+               pods, err := collectdutils.GetPodList(r.client, instance.Namespace)
                if err != nil {
                        return err
                }
-               if !reflect.DeepEqual(pods, cr.Status.CollectdAgents) {
-                       cr.Status.CollectdAgents = pods
-                       cr.Status.Status = onapv1alpha1.Enabled
+               if !reflect.DeepEqual(pods, instance.Status.CollectdAgents) {
+                       instance.Status.CollectdAgents = pods
+                       instance.Status.Status = onapv1alpha1.Enabled
                }
        case onapv1alpha1.Deleting, onapv1alpha1.Deprecated:
                return nil
        }
-       err := r.client.Status().Update(context.TODO(), cr)
+       err = r.client.Status().Update(context.TODO(), instance)
        return err
 }
 
@@ -255,11 +297,12 @@ func (r *ReconcileCollectdPlugin) addFinalizer(reqLogger logr.Logger, cr *onapv1
        // Update status from Initial to Created
        // Since addFinalizer will be executed only once,
        // the status will be changed from Initial state to Created
-       updateErr := r.updateStatus(cr)
-       if updateErr != nil {
-               reqLogger.Error(updateErr, "Failed to update status from Initial state")
-       }
+       // updateErr := r.updateStatus(cr)
+       // if updateErr != nil {
+       //      reqLogger.Error(updateErr, "Failed to update status from Initial state")
+       // }
        // Update CR
+       cr.Status.Status = onapv1alpha1.Created
        err := r.client.Update(context.TODO(), cr)
        if err != nil {
                reqLogger.Error(err, "Failed to update CollectdPlugin with finalizer")
index 17cad0e..b379d91 100644 (file)
@@ -211,7 +211,7 @@ func RebuildCollectdConf(rc client.Client, ns string, isDelete bool, delPlugin s
                collectdConf += cpConf + "\n"
        }
 
-       collectdConf += "#Last line (collectd requires ā€˜\\nā€™ at the last line)\n"
+       collectdConf += "#Last line (collectd requires '\\n' at the last line)\n"
 
        return collectdConf, nil
 }
diff --git a/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/predicate.go b/vnfs/DAaaS/microservices/collectd-operator/pkg/controller/utils/predicate.go
new file mode 100644 (file)
index 0000000..a9ec1dc
--- /dev/null
@@ -0,0 +1,65 @@
+// Copyright 2018 The Operator-SDK Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package utils
+
+import (
+       appsv1 "k8s.io/api/apps/v1"
+
+       "sigs.k8s.io/controller-runtime/pkg/event"
+       "sigs.k8s.io/controller-runtime/pkg/predicate"
+       logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
+)
+
+var plog = logf.Log.WithName("predicate").WithName("eventFilters")
+
+// DaemonSetStatusChangedPredicate implements a default update predicate function on status change for Daemonsets
+// (adapted from sigs.k8s.io/controller-runtime/pkg/predicate/predicate.GenerationChangedPredicate)
+type DaemonSetStatusChangedPredicate struct {
+       predicate.Funcs
+}
+
+// Update implements default UpdateEvent filter for validating generation change
+func (DaemonSetStatusChangedPredicate) Update(e event.UpdateEvent) bool {
+       newDS := e.ObjectNew.DeepCopyObject().(*appsv1.DaemonSet)
+       oldDS := e.ObjectOld.DeepCopyObject().(*appsv1.DaemonSet)
+       plog.V(2).Info("newDS", "nUNS:=", newDS.Status.UpdatedNumberScheduled, "oUNS:=", oldDS.Status.UpdatedNumberScheduled, "nDNS:=", newDS.Status.DesiredNumberScheduled, "nNR:=", newDS.Status.NumberReady, "nNA:=", newDS.Status.NumberAvailable)
+       if newDS.Status.UpdatedNumberScheduled >= oldDS.Status.UpdatedNumberScheduled {
+               if (newDS.Status.UpdatedNumberScheduled == newDS.Status.NumberReady) &&
+                       (newDS.Status.UpdatedNumberScheduled == newDS.Status.NumberAvailable) {
+                       return true
+               }
+       }
+       if e.MetaOld == nil {
+               plog.Error(nil, "Update event has no old metadata", "event", e)
+               return false
+       }
+       if e.ObjectOld == nil {
+               plog.Error(nil, "Update event has no old runtime object to update", "event", e)
+               return false
+       }
+       if e.ObjectNew == nil {
+               plog.Error(nil, "Update event has no new runtime object for update", "event", e)
+               return false
+       }
+       if e.MetaNew == nil {
+               plog.Error(nil, "Update event has no new metadata", "event", e)
+               return false
+       }
+       if e.MetaNew.GetGeneration() == e.MetaOld.GetGeneration() {
+               return false
+       }
+
+       return true
+}