Add/update/delete PrometheusRemoteEndpoint CR
[demo.git] / vnfs / DAaaS / microservices / remote-config-operator / pkg / controller / prometheusremoteendpoint / prometheusremoteendpoint_controller.go
diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go
new file mode 100644 (file)
index 0000000..2d78a43
--- /dev/null
@@ -0,0 +1,254 @@
+package prometheusremoteendpoint
+
+import (
+       "context"
+       "encoding/json"
+       "strconv"
+
+       onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1"
+
+       monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
+       logr "github.com/go-logr/logr"
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
+       remoteconfigutils "remote-config-operator/pkg/controller/utils"
+       "sigs.k8s.io/controller-runtime/pkg/client"
+       "sigs.k8s.io/controller-runtime/pkg/controller"
+       "sigs.k8s.io/controller-runtime/pkg/handler"
+       logf "sigs.k8s.io/controller-runtime/pkg/log"
+       "sigs.k8s.io/controller-runtime/pkg/manager"
+       "sigs.k8s.io/controller-runtime/pkg/reconcile"
+       "sigs.k8s.io/controller-runtime/pkg/source"
+)
+
+var log = logf.Log.WithName("controller_prometheusremoteendpoint")
+
+// Add creates a new PrometheusRemoteEndpoint Controller and adds it to the Manager. The Manager will set fields on the Controller
+// and Start it when the Manager is Started.
+func Add(mgr manager.Manager) error {
+       return add(mgr, newReconciler(mgr))
+}
+
+// newReconciler returns a new reconcile.Reconciler
+func newReconciler(mgr manager.Manager) reconcile.Reconciler {
+       return &ReconcilePrometheusRemoteEndpoint{client: mgr.GetClient(), scheme: mgr.GetScheme()}
+}
+
+// add adds a new Controller to mgr with r as the reconcile.Reconciler
+func add(mgr manager.Manager, r reconcile.Reconciler) error {
+       // Create a new controller
+       c, err := controller.New("prometheusremoteendpoint-controller", mgr, controller.Options{Reconciler: r})
+       if err != nil {
+               return err
+       }
+
+       // Watch for changes to primary resource PrometheusRemoteEndpoint
+
+       log.V(1).Info("Add watcher for primary resource PrometheusRemoteEndpoint")
+       err = c.Watch(&source.Kind{Type: &onapv1alpha1.PrometheusRemoteEndpoint{}}, &handler.EnqueueRequestForObject{})
+       if err != nil {
+               return err
+       }
+
+       log.V(1).Info("Add watcher for secondary resource RemoteFilterAction")
+       // TODO(user): Modify this to be the types you create that are owned by the primary resource
+       // Watch for changes to secondary resource Pods and requeue the owner PrometheusRemoteEndpoint
+       err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
+               IsController: true,
+               OwnerType:    &onapv1alpha1.PrometheusRemoteEndpoint{},
+       })
+       if err != nil {
+               log.Error(err, "Error enqueuing requests due to remoteFilterAction changes")
+               return err
+       }
+
+       log.Info("Enqueued reconcile requests due to remoteFilterAction changes")
+       return nil
+}
+
+// blank assignment to verify that ReconcilePrometheusRemoteEndpoint implements reconcile.Reconciler
+var _ reconcile.Reconciler = &ReconcilePrometheusRemoteEndpoint{}
+
+// ReconcilePrometheusRemoteEndpoint reconciles a PrometheusRemoteEndpoint object
+type ReconcilePrometheusRemoteEndpoint struct {
+       // This client, initialized using mgr.Client() above, is a split client
+       // that reads objects from the cache and writes to the apiserver
+       client client.Client
+       scheme *runtime.Scheme
+}
+
+// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object and makes changes based on the state read
+// and what is in the PrometheusRemoteEndpoint.Spec
+
+// Note:
+// The Controller will requeue the Request to be processed again if the returned error is non-nil or
+// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
+func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request) (reconcile.Result, error) {
+       reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
+       reqLogger.Info("Reconciling PrometheusRemoteEndpoint")
+
+       // Fetch the PrometheusRemoteEndpoint instance
+       instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
+       err := r.client.Get(context.TODO(), request.NamespacedName, instance)
+       if err != nil {
+               if errors.IsNotFound(err) {
+                       // Request object not found, could have been deleted after reconcile request.
+                       // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
+                       // Return and don't requeue
+                       reqLogger.Error(err, "PrometheusRemoteEndpoint object not found")
+                       return reconcile.Result{}, nil
+               }
+               // Error reading the object - requeue the request.
+               reqLogger.Error(err, "Error reading PrometheusRemoteEndpoint object, Requeing ")
+               return reconcile.Result{}, err
+       }
+
+       isBeingDeleted := checkDeletionTimestamp(reqLogger, instance)
+       if isBeingDeleted {
+               //Delete Remote write
+               if err := r.processDeletionRequest(reqLogger, instance); err != nil {
+                       reqLogger.Error(err, "Error processing deletion request")
+                       return reconcile.Result{}, err
+               }
+               return reconcile.Result{}, nil
+       }
+
+       //Add finalizer for the CR object
+       if !remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
+               reqLogger.Info("Adding finalizer for PrometheusRemoteEndpoint")
+               if err := addFinalizer(reqLogger, instance); err != nil {
+                       return reconcile.Result{}, err
+               }
+               err := r.client.Update(context.TODO(), instance)
+               if err != nil {
+                       reqLogger.Error(err, "Unable to update instance")
+                       return reconcile.Result{}, err
+               }
+               return reconcile.Result{}, nil
+       }
+
+       if err := r.processPatchRequest(reqLogger, instance); err != nil {
+               reqLogger.Error(err, "Error processing request")
+               return reconcile.Result{}, err
+       }
+       return reconcile.Result{}, nil
+}
+
+func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
+
+       prom := &monitoringv1.Prometheus{}
+       if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err1 != nil {
+               reqLogger.Error(err1, "Error getting prometheus")
+               return err1
+       }
+       reqLogger.Info("Found prometheus to update")
+
+       var patch []byte
+
+       rws := prom.Spec.RemoteWrite
+       adapterURL := instance.Spec.AdapterUrl
+
+       isUpdate := false
+       for i, spec := range rws {
+               if spec.URL == instance.Spec.AdapterUrl {
+                       reqLogger.Info("Remote write already exists, updating it")
+                       patch, _ = formPatch("replace", strconv.Itoa(i), adapterURL, instance, reqLogger)
+                       isUpdate = true
+                       break
+               }
+       }
+
+       if !isUpdate {
+               reqLogger.Info("Remote write does not exist, creating one...")
+               // rwsLength := len(rws)
+               patch, _ = formPatch("add", "-", adapterURL, instance, reqLogger)
+       }
+       patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
+       if patchErr != nil {
+               reqLogger.Error(patchErr, "Unable to process patch to prometheus")
+               return patchErr
+       }
+       reqLogger.Info("Patch merged")
+
+       return nil
+}
+
+func checkDeletionTimestamp(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) bool {
+       isMarkedForDeletion := instance.GetDeletionTimestamp() != nil
+       return isMarkedForDeletion
+}
+
+func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
+       prom := &monitoringv1.Prometheus{}
+       if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil {
+               reqLogger.Error(err, "Error getting prometheus")
+               return err
+       }
+       reqLogger.Info("Found prometheus to update")
+
+       var patch []byte
+       adapterURL := instance.Spec.AdapterUrl
+
+       rws := prom.Spec.RemoteWrite
+       for i, spec := range rws {
+               if spec.URL == instance.Spec.AdapterUrl {
+                       reqLogger.Info("Found remote write to be removed, removing it")
+                       patch, _ = formPatch("remove", strconv.Itoa(i), adapterURL, instance, reqLogger)
+                       break
+               }
+       }
+       patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
+       if patchErr != nil {
+               reqLogger.Error(patchErr, "Unable to process patch to prometheus")
+               return patchErr
+       }
+       reqLogger.Info("Patch merged, remote write removed")
+
+       //remove Finalizer after deletion
+       if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
+               if err := removeFinalizer(reqLogger, instance); err != nil {
+                       return err
+               }
+               err := r.client.Update(context.TODO(), instance)
+               if err != nil {
+                       reqLogger.Error(err, "Unable to update instance")
+                       return err
+               }
+       }
+       return nil
+}
+
+func addFinalizer(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
+       reqlogger.Info("Adding finalizer for the PrometheusRemoteEndpoint")
+       instance.SetFinalizers(append(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer))
+       return nil
+}
+
+func removeFinalizer(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
+       reqlogger.Info("Removing finalizer for the PrometheusRemoteEndpoint")
+       instance.SetFinalizers(remoteconfigutils.Remove(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer))
+       return nil
+}
+
+func formPatch(method string, index string, adapterURL string, instance *onapv1alpha1.PrometheusRemoteEndpoint, reqLogger logr.Logger) ([]byte, error) {
+       var err error
+       var mergePatch []byte
+       path := "/spec/remoteWrite/" + index
+       mergePatch, err = json.Marshal(map[string]interface{}{
+               "op":   method,
+               "path": path,
+               "value": map[string]interface{}{
+                       "url":           adapterURL,
+                       "remoteTimeout": instance.Spec.RemoteTimeout,
+               },
+       })
+       if err != nil {
+               reqLogger.Error(err, "Unable to form patch")
+               return nil, err
+       }
+       prependMergePatch := append([]byte{91}, mergePatch...)
+       finalMergePatch := append(prependMergePatch, 93)
+       return finalMergePatch, nil
+}