X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=vnfs%2FDAaaS%2Fmicroservices%2Fremote-config-operator%2Fpkg%2Fcontroller%2Fprometheusremoteendpoint%2Fprometheusremoteendpoint_controller.go;h=914cb375d478c530df8284bd2bb34d06f9883229;hb=fb9b7baa506e5c92bc243a30364e9f72ecd9c3f1;hp=2d78a43fab64c8b4252c19199d954db9f7c30ea3;hpb=2026cb5283fbc44a4f68641f6e85628381ebda04;p=demo.git diff --git a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go index 2d78a43f..914cb375 100644 --- a/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go +++ b/vnfs/DAaaS/microservices/remote-config-operator/pkg/controller/prometheusremoteendpoint/prometheusremoteendpoint_controller.go @@ -1,9 +1,12 @@ package prometheusremoteendpoint import ( + "bytes" "context" "encoding/json" + "net/http" "strconv" + "strings" onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1" @@ -79,9 +82,8 @@ type ReconcilePrometheusRemoteEndpoint struct { scheme *runtime.Scheme } -// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object and makes changes based on the state read -// and what is in the PrometheusRemoteEndpoint.Spec - +// Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object +// and makes changes based on the state read and what is in the PrometheusRemoteEndpoint.Spec // Note: // The Controller will requeue the Request to be processed again if the returned error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. @@ -105,8 +107,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request) return reconcile.Result{}, err } - isBeingDeleted := checkDeletionTimestamp(reqLogger, instance) - if isBeingDeleted { + // Check if CR is being Deleted + if instance.GetDeletionTimestamp() != nil { //Delete Remote write if err := r.processDeletionRequest(reqLogger, instance); err != nil { reqLogger.Error(err, "Error processing deletion request") @@ -139,7 +141,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request) func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error { prom := &monitoringv1.Prometheus{} - if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err1 != nil { + pName := instance.ObjectMeta.Labels["app"] + if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: pName}, prom); err1 != nil { reqLogger.Error(err1, "Error getting prometheus") return err1 } @@ -148,13 +151,20 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L var patch []byte rws := prom.Spec.RemoteWrite - adapterURL := instance.Spec.AdapterUrl + remoteURL, id, err := getAdapterInfo(instance) + instanceKey := types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name} + if err != nil { + reqLogger.Error(err, "Unable to get adapter url") + return err + } isUpdate := false for i, spec := range rws { - if spec.URL == instance.Spec.AdapterUrl { + // Update event - check the prometheus remote write Url against remoteURL in the Status + // to consider the case when URL itself is updated. + if spec.URL == instance.Status.RemoteURL { reqLogger.Info("Remote write already exists, updating it") - patch, _ = formPatch("replace", strconv.Itoa(i), adapterURL, instance, reqLogger) + patch, _ = formPatch("replace", strconv.Itoa(i), remoteURL, instance, reqLogger) isUpdate = true break } @@ -163,23 +173,21 @@ func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.L if !isUpdate { reqLogger.Info("Remote write does not exist, creating one...") // rwsLength := len(rws) - patch, _ = formPatch("add", "-", adapterURL, instance, reqLogger) + patch, _ = formPatch("add", "-", remoteURL, instance, reqLogger) } patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch)) if patchErr != nil { reqLogger.Error(patchErr, "Unable to process patch to prometheus") + cleanUpExternalResources(instance) + r.updateStatus("Error", instanceKey, "", "", "") return patchErr } - reqLogger.Info("Patch merged") + r.updateStatus("Enabled", instanceKey, pName, remoteURL, id) + reqLogger.V(1).Info("Patch merged") return nil } -func checkDeletionTimestamp(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) bool { - isMarkedForDeletion := instance.GetDeletionTimestamp() != nil - return isMarkedForDeletion -} - func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error { prom := &monitoringv1.Prometheus{} if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil { @@ -189,13 +197,17 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log reqLogger.Info("Found prometheus to update") var patch []byte - adapterURL := instance.Spec.AdapterUrl + remoteURL, _, err := getAdapterInfo(instance) + if err != nil { + reqLogger.Error(err, "Unable to get adapter info") + return err + } rws := prom.Spec.RemoteWrite for i, spec := range rws { - if spec.URL == instance.Spec.AdapterUrl { + if spec.URL == remoteURL { reqLogger.Info("Found remote write to be removed, removing it") - patch, _ = formPatch("remove", strconv.Itoa(i), adapterURL, instance, reqLogger) + patch, _ = formPatch("remove", strconv.Itoa(i), remoteURL, instance, reqLogger) break } } @@ -204,8 +216,8 @@ func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger log reqLogger.Error(patchErr, "Unable to process patch to prometheus") return patchErr } - reqLogger.Info("Patch merged, remote write removed") - + reqLogger.V(1).Info("Patch merged, remote write removed") + cleanUpExternalResources(instance) //remove Finalizer after deletion if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) { if err := removeFinalizer(reqLogger, instance); err != nil { @@ -252,3 +264,85 @@ func formPatch(method string, index string, adapterURL string, instance *onapv1a finalMergePatch := append(prependMergePatch, 93) return finalMergePatch, nil } + +func (r *ReconcilePrometheusRemoteEndpoint) updateStatus(status string, key types.NamespacedName, prom string, remoteURL string, kwid string) error { + // Fetch the CollectdGlobal instance + instance := &onapv1alpha1.PrometheusRemoteEndpoint{} + err := r.client.Get(context.TODO(), key, instance) + if err != nil { + return err + } + instance.Status.Status = status + instance.Status.PrometheusInstance = prom + instance.Status.KafkaWriterID = kwid + instance.Status.RemoteURL = remoteURL + err = r.client.Status().Update(context.TODO(), instance) + return err +} + +func cleanUpExternalResources(instance *onapv1alpha1.PrometheusRemoteEndpoint) { + if instance.Spec.Type == "kafka" { + deleteKafkaWriter(instance.Spec.AdapterURL + "/pkw/" + instance.Status.KafkaWriterID) + } +} + +func getAdapterInfo(instance *onapv1alpha1.PrometheusRemoteEndpoint) (remoteURL string, id string, err error) { + switch strings.ToLower(instance.Spec.Type) { + case "m3db": + return instance.Spec.AdapterURL + "/api/v1/prom/remote/write", "", nil + case "kafka": + kwid, err := getKafkaWriter(instance) + return instance.Spec.AdapterURL + "/pkw/" + kwid + "/receive", kwid, err + default: + return instance.Spec.AdapterURL, "", nil + } +} + +func deleteKafkaWriter(kwURL string) error { + client := &http.Client{} + req, err := http.NewRequest(http.MethodDelete, kwURL, nil) + if err != nil { + log.Error(err, "Failed to form delete Kafka Writer request") + return err + } + _, err = client.Do(req) + if err != nil { + log.Error(err, "Failed to delete Kafka Writer", "Kafka Writer", kwURL) + return err + } + return nil +} + +func getKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) { + // TODO - check update events + if instance.Status.KafkaWriterID != "" { + return instance.Status.KafkaWriterID, nil + } + return createKafkaWriter(instance) +} + +func createKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) { + + log.V(1).Info("Processing Kafka Remote Endpoint", "Kafka Writer Config", instance.Spec) + baseURL := instance.Spec.AdapterURL + kwc := instance.Spec.KafkaConfig + kwURL := baseURL + "/pkw" + + postBody, err := json.Marshal(kwc) + if err != nil { + log.Error(err, "JSON Marshalling error") + return "", err + } + + resp, err := http.Post(kwURL, "application/json", bytes.NewBuffer(postBody)) + if err != nil { + log.Error(err, "Failed to create Kafka Writer", "Kafka Writer", kwURL, "Kafka Writer Config", kwc) + return "", err + } + defer resp.Body.Close() + var kwid string + json.NewDecoder(resp.Body).Decode(&kwid) + log.Info("Kafka Writer created", "Kafka Writer Id", kwid) + + return kwid, err +}