+
+func (r *ReconcilePrometheusRemoteEndpoint) updateStatus(status string, key types.NamespacedName, prom string, remoteURL string, kwid string) error {
+ // Fetch the CollectdGlobal instance
+ instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
+ err := r.client.Get(context.TODO(), key, instance)
+ if err != nil {
+ return err
+ }
+ instance.Status.Status = status
+ instance.Status.PrometheusInstance = prom
+ instance.Status.KafkaWriterID = kwid
+ instance.Status.RemoteURL = remoteURL
+ err = r.client.Status().Update(context.TODO(), instance)
+ return err
+}
+
+func cleanUpExternalResources(instance *onapv1alpha1.PrometheusRemoteEndpoint) {
+ if instance.Spec.Type == "kafka" {
+ deleteKafkaWriter(instance.Spec.AdapterURL + "/pkw/" + instance.Status.KafkaWriterID)
+ }
+}
+
+func getAdapterInfo(instance *onapv1alpha1.PrometheusRemoteEndpoint) (remoteURL string, id string, err error) {
+ switch strings.ToLower(instance.Spec.Type) {
+ case "m3db":
+ return instance.Spec.AdapterURL + "/api/v1/prom/remote/write", "", nil
+ case "kafka":
+ kwid, err := getKafkaWriter(instance)
+ return instance.Spec.AdapterURL + "/pkw/" + kwid + "/receive", kwid, err
+ default:
+ return instance.Spec.AdapterURL, "", nil
+ }
+}
+
+func deleteKafkaWriter(kwURL string) error {
+ client := &http.Client{}
+ req, err := http.NewRequest(http.MethodDelete, kwURL, nil)
+ if err != nil {
+ log.Error(err, "Failed to form delete Kafka Writer request")
+ return err
+ }
+ _, err = client.Do(req)
+ if err != nil {
+ log.Error(err, "Failed to delete Kafka Writer", "Kafka Writer", kwURL)
+ return err
+ }
+ return nil
+}
+
+func getKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
+ // TODO - check update events
+ if instance.Status.KafkaWriterID != "" {
+ return instance.Status.KafkaWriterID, nil
+ }
+ return createKafkaWriter(instance)
+}
+
+func createKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
+
+ log.V(1).Info("Processing Kafka Remote Endpoint", "Kafka Writer Config", instance.Spec)
+ baseURL := instance.Spec.AdapterURL
+ kwc := instance.Spec.KafkaConfig
+ kwURL := baseURL + "/pkw"
+
+ postBody, err := json.Marshal(kwc)
+ if err != nil {
+ log.Error(err, "JSON Marshalling error")
+ return "", err
+ }
+
+ resp, err := http.Post(kwURL, "application/json", bytes.NewBuffer(postBody))
+ if err != nil {
+ log.Error(err, "Failed to create Kafka Writer", "Kafka Writer", kwURL, "Kafka Writer Config", kwc)
+ return "", err
+ }
+ defer resp.Body.Close()
+ var kwid string
+ json.NewDecoder(resp.Body).Decode(&kwid)
+ log.Info("Kafka Writer created", "Kafka Writer Id", kwid)
+
+ return kwid, err
+}