Remote Write Config and Filter operator
[demo.git] / vnfs / DAaaS / microservices / remote-config-operator / pkg / controller / prometheusremoteendpoint / prometheusremoteendpoint_controller.go
index 2d78a43..914cb37 100644 (file)
@@ -1,9 +1,12 @@
 package prometheusremoteendpoint
 
 import (
+       "bytes"
        "context"
        "encoding/json"
+       "net/http"
        "strconv"
+       "strings"
 
        onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1"
 
@@ -79,9 +82,8 @@ type ReconcilePrometheusRemoteEndpoint struct {
        scheme *runtime.Scheme
 }
 
-// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object and makes changes based on the state read
-// and what is in the PrometheusRemoteEndpoint.Spec
-
+// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object
+// and makes changes based on the state read and what is in the PrometheusRemoteEndpoint.Spec
 // Note:
 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
@@ -105,8 +107,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request)
                return reconcile.Result{}, err
        }
 
-       isBeingDeleted := checkDeletionTimestamp(reqLogger, instance)
-       if isBeingDeleted {
+       // Check if CR is being Deleted
+       if instance.GetDeletionTimestamp() != nil {
                //Delete Remote write
                if err := r.processDeletionRequest(reqLogger, instance); err != nil {
                        reqLogger.Error(err, "Error processing deletion request")
@@ -139,7 +141,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request)
 func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
 
        prom := &monitoringv1.Prometheus{}
-       if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err1 != nil {
+       pName := instance.ObjectMeta.Labels["app"]
+       if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: pName}, prom); err1 != nil {
                reqLogger.Error(err1, "Error getting prometheus")
                return err1
        }
@@ -148,13 +151,20 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L
        var patch []byte
 
        rws := prom.Spec.RemoteWrite
-       adapterURL := instance.Spec.AdapterUrl
+       remoteURL, id, err := getAdapterInfo(instance)
+       instanceKey := types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}
+       if err != nil {
+               reqLogger.Error(err, "Unable to get adapter url")
+               return err
+       }
 
        isUpdate := false
        for i, spec := range rws {
-               if spec.URL == instance.Spec.AdapterUrl {
+               // Update event - check the prometheus remote write Url against remoteURL in the Status
+               // to consider the case when URL itself is updated.
+               if spec.URL == instance.Status.RemoteURL {
                        reqLogger.Info("Remote write already exists, updating it")
-                       patch, _ = formPatch("replace", strconv.Itoa(i), adapterURL, instance, reqLogger)
+                       patch, _ = formPatch("replace", strconv.Itoa(i), remoteURL, instance, reqLogger)
                        isUpdate = true
                        break
                }
@@ -163,23 +173,21 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L
        if !isUpdate {
                reqLogger.Info("Remote write does not exist, creating one...")
                // rwsLength := len(rws)
-               patch, _ = formPatch("add", "-", adapterURL, instance, reqLogger)
+               patch, _ = formPatch("add", "-", remoteURL, instance, reqLogger)
        }
        patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
        if patchErr != nil {
                reqLogger.Error(patchErr, "Unable to process patch to prometheus")
+               cleanUpExternalResources(instance)
+               r.updateStatus("Error", instanceKey, "", "", "")
                return patchErr
        }
-       reqLogger.Info("Patch merged")
+       r.updateStatus("Enabled", instanceKey, pName, remoteURL, id)
+       reqLogger.V(1).Info("Patch merged")
 
        return nil
 }
 
-func checkDeletionTimestamp(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) bool {
-       isMarkedForDeletion := instance.GetDeletionTimestamp() != nil
-       return isMarkedForDeletion
-}
-
 func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
        prom := &monitoringv1.Prometheus{}
        if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil {
@@ -189,13 +197,17 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log
        reqLogger.Info("Found prometheus to update")
 
        var patch []byte
-       adapterURL := instance.Spec.AdapterUrl
+       remoteURL, _, err := getAdapterInfo(instance)
+       if err != nil {
+               reqLogger.Error(err, "Unable to get adapter info")
+               return err
+       }
 
        rws := prom.Spec.RemoteWrite
        for i, spec := range rws {
-               if spec.URL == instance.Spec.AdapterUrl {
+               if spec.URL == remoteURL {
                        reqLogger.Info("Found remote write to be removed, removing it")
-                       patch, _ = formPatch("remove", strconv.Itoa(i), adapterURL, instance, reqLogger)
+                       patch, _ = formPatch("remove", strconv.Itoa(i), remoteURL, instance, reqLogger)
                        break
                }
        }
@@ -204,8 +216,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log
                reqLogger.Error(patchErr, "Unable to process patch to prometheus")
                return patchErr
        }
-       reqLogger.Info("Patch merged, remote write removed")
-
+       reqLogger.V(1).Info("Patch merged, remote write removed")
+       cleanUpExternalResources(instance)
        //remove Finalizer after deletion
        if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
                if err := removeFinalizer(reqLogger, instance); err != nil {
@@ -252,3 +264,85 @@ func formPatch(method string, index string, adapterURL string, instance *onapv1a
        finalMergePatch := append(prependMergePatch, 93)
        return finalMergePatch, nil
 }
+
+func (r *ReconcilePrometheusRemoteEndpoint) updateStatus(status string, key types.NamespacedName, prom string, remoteURL string, kwid string) error {
+       // Fetch the CollectdGlobal instance
+       instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
+       err := r.client.Get(context.TODO(), key, instance)
+       if err != nil {
+               return err
+       }
+       instance.Status.Status = status
+       instance.Status.PrometheusInstance = prom
+       instance.Status.KafkaWriterID = kwid
+       instance.Status.RemoteURL = remoteURL
+       err = r.client.Status().Update(context.TODO(), instance)
+       return err
+}
+
+func cleanUpExternalResources(instance *onapv1alpha1.PrometheusRemoteEndpoint) {
+       if instance.Spec.Type == "kafka" {
+               deleteKafkaWriter(instance.Spec.AdapterURL + "/pkw/" + instance.Status.KafkaWriterID)
+       }
+}
+
+func getAdapterInfo(instance *onapv1alpha1.PrometheusRemoteEndpoint) (remoteURL string, id string, err error) {
+       switch strings.ToLower(instance.Spec.Type) {
+       case "m3db":
+               return instance.Spec.AdapterURL + "/api/v1/prom/remote/write", "", nil
+       case "kafka":
+               kwid, err := getKafkaWriter(instance)
+               return instance.Spec.AdapterURL + "/pkw/" + kwid + "/receive", kwid, err
+       default:
+               return instance.Spec.AdapterURL, "", nil
+       }
+}
+
+func deleteKafkaWriter(kwURL string) error {
+       client := &http.Client{}
+       req, err := http.NewRequest(http.MethodDelete, kwURL, nil)
+       if err != nil {
+               log.Error(err, "Failed to form delete Kafka Writer request")
+               return err
+       }
+       _, err = client.Do(req)
+       if err != nil {
+               log.Error(err, "Failed to delete Kafka Writer", "Kafka Writer", kwURL)
+               return err
+       }
+       return nil
+}
+
+func getKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
+       // TODO - check update events
+       if instance.Status.KafkaWriterID != "" {
+               return instance.Status.KafkaWriterID, nil
+       }
+       return createKafkaWriter(instance)
+}
+
+func createKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
+
+       log.V(1).Info("Processing Kafka Remote Endpoint", "Kafka Writer Config", instance.Spec)
+       baseURL := instance.Spec.AdapterURL
+       kwc := instance.Spec.KafkaConfig
+       kwURL := baseURL + "/pkw"
+
+       postBody, err := json.Marshal(kwc)
+       if err != nil {
+               log.Error(err, "JSON Marshalling error")
+               return "", err
+       }
+
+       resp, err := http.Post(kwURL, "application/json", bytes.NewBuffer(postBody))
+       if err != nil {
+               log.Error(err, "Failed to create Kafka Writer", "Kafka Writer", kwURL, "Kafka Writer Config", kwc)
+               return "", err
+       }
+       defer resp.Body.Close()
+       var kwid string
+       json.NewDecoder(resp.Body).Decode(&kwid)
+       log.Info("Kafka Writer created", "Kafka Writer Id", kwid)
+
+       return kwid, err
+}