Remote Write Config and Filter operator
[demo.git] / vnfs / DAaaS / microservices / remote-config-operator / pkg / controller / prometheusremoteendpoint / prometheusremoteendpoint_controller.go
1 package prometheusremoteendpoint
2
3 import (
4         "bytes"
5         "context"
6         "encoding/json"
7         "net/http"
8         "strconv"
9         "strings"
10
11         onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1"
12
13         monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
14         logr "github.com/go-logr/logr"
15         corev1 "k8s.io/api/core/v1"
16         "k8s.io/apimachinery/pkg/api/errors"
17         "k8s.io/apimachinery/pkg/runtime"
18         "k8s.io/apimachinery/pkg/types"
19         remoteconfigutils "remote-config-operator/pkg/controller/utils"
20         "sigs.k8s.io/controller-runtime/pkg/client"
21         "sigs.k8s.io/controller-runtime/pkg/controller"
22         "sigs.k8s.io/controller-runtime/pkg/handler"
23         logf "sigs.k8s.io/controller-runtime/pkg/log"
24         "sigs.k8s.io/controller-runtime/pkg/manager"
25         "sigs.k8s.io/controller-runtime/pkg/reconcile"
26         "sigs.k8s.io/controller-runtime/pkg/source"
27 )
28
29 var log = logf.Log.WithName("controller_prometheusremoteendpoint")
30
31 // Add creates a new PrometheusRemoteEndpoint Controller and adds it to the Manager. The Manager will set fields on the Controller
32 // and Start it when the Manager is Started.
33 func Add(mgr manager.Manager) error {
34         return add(mgr, newReconciler(mgr))
35 }
36
37 // newReconciler returns a new reconcile.Reconciler
38 func newReconciler(mgr manager.Manager) reconcile.Reconciler {
39         return &ReconcilePrometheusRemoteEndpoint{client: mgr.GetClient(), scheme: mgr.GetScheme()}
40 }
41
42 // add adds a new Controller to mgr with r as the reconcile.Reconciler
43 func add(mgr manager.Manager, r reconcile.Reconciler) error {
44         // Create a new controller
45         c, err := controller.New("prometheusremoteendpoint-controller", mgr, controller.Options{Reconciler: r})
46         if err != nil {
47                 return err
48         }
49
50         // Watch for changes to primary resource PrometheusRemoteEndpoint
51
52         log.V(1).Info("Add watcher for primary resource PrometheusRemoteEndpoint")
53         err = c.Watch(&source.Kind{Type: &onapv1alpha1.PrometheusRemoteEndpoint{}}, &handler.EnqueueRequestForObject{})
54         if err != nil {
55                 return err
56         }
57
58         log.V(1).Info("Add watcher for secondary resource RemoteFilterAction")
59         // TODO(user): Modify this to be the types you create that are owned by the primary resource
60         // Watch for changes to secondary resource Pods and requeue the owner PrometheusRemoteEndpoint
61         err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
62                 IsController: true,
63                 OwnerType:    &onapv1alpha1.PrometheusRemoteEndpoint{},
64         })
65         if err != nil {
66                 log.Error(err, "Error enqueuing requests due to remoteFilterAction changes")
67                 return err
68         }
69
70         log.Info("Enqueued reconcile requests due to remoteFilterAction changes")
71         return nil
72 }
73
74 // blank assignment to verify that ReconcilePrometheusRemoteEndpoint implements reconcile.Reconciler
75 var _ reconcile.Reconciler = &ReconcilePrometheusRemoteEndpoint{}
76
77 // ReconcilePrometheusRemoteEndpoint reconciles a PrometheusRemoteEndpoint object
78 type ReconcilePrometheusRemoteEndpoint struct {
79         // This client, initialized using mgr.Client() above, is a split client
80         // that reads objects from the cache and writes to the apiserver
81         client client.Client
82         scheme *runtime.Scheme
83 }
84
85 // Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object
86 // and makes changes based on the state read and what is in the PrometheusRemoteEndpoint.Spec
87 // Note:
88 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
89 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
90 func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request) (reconcile.Result, error) {
91         reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
92         reqLogger.Info("Reconciling PrometheusRemoteEndpoint")
93
94         // Fetch the PrometheusRemoteEndpoint instance
95         instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
96         err := r.client.Get(context.TODO(), request.NamespacedName, instance)
97         if err != nil {
98                 if errors.IsNotFound(err) {
99                         // Request object not found, could have been deleted after reconcile request.
100                         // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
101                         // Return and don't requeue
102                         reqLogger.Error(err, "PrometheusRemoteEndpoint object not found")
103                         return reconcile.Result{}, nil
104                 }
105                 // Error reading the object - requeue the request.
106                 reqLogger.Error(err, "Error reading PrometheusRemoteEndpoint object, Requeing ")
107                 return reconcile.Result{}, err
108         }
109
110         // Check if CR is being Deleted
111         if instance.GetDeletionTimestamp() != nil {
112                 //Delete Remote write
113                 if err := r.processDeletionRequest(reqLogger, instance); err != nil {
114                         reqLogger.Error(err, "Error processing deletion request")
115                         return reconcile.Result{}, err
116                 }
117                 return reconcile.Result{}, nil
118         }
119
120         //Add finalizer for the CR object
121         if !remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
122                 reqLogger.Info("Adding finalizer for PrometheusRemoteEndpoint")
123                 if err := addFinalizer(reqLogger, instance); err != nil {
124                         return reconcile.Result{}, err
125                 }
126                 err := r.client.Update(context.TODO(), instance)
127                 if err != nil {
128                         reqLogger.Error(err, "Unable to update instance")
129                         return reconcile.Result{}, err
130                 }
131                 return reconcile.Result{}, nil
132         }
133
134         if err := r.processPatchRequest(reqLogger, instance); err != nil {
135                 reqLogger.Error(err, "Error processing request")
136                 return reconcile.Result{}, err
137         }
138         return reconcile.Result{}, nil
139 }
140
141 func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
142
143         prom := &monitoringv1.Prometheus{}
144         pName := instance.ObjectMeta.Labels["app"]
145         if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: pName}, prom); err1 != nil {
146                 reqLogger.Error(err1, "Error getting prometheus")
147                 return err1
148         }
149         reqLogger.Info("Found prometheus to update")
150
151         var patch []byte
152
153         rws := prom.Spec.RemoteWrite
154         remoteURL, id, err := getAdapterInfo(instance)
155         instanceKey := types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}
156         if err != nil {
157                 reqLogger.Error(err, "Unable to get adapter url")
158                 return err
159         }
160
161         isUpdate := false
162         for i, spec := range rws {
163                 // Update event - check the prometheus remote write Url against remoteURL in the Status
164                 // to consider the case when URL itself is updated.
165                 if spec.URL == instance.Status.RemoteURL {
166                         reqLogger.Info("Remote write already exists, updating it")
167                         patch, _ = formPatch("replace", strconv.Itoa(i), remoteURL, instance, reqLogger)
168                         isUpdate = true
169                         break
170                 }
171         }
172
173         if !isUpdate {
174                 reqLogger.Info("Remote write does not exist, creating one...")
175                 // rwsLength := len(rws)
176                 patch, _ = formPatch("add", "-", remoteURL, instance, reqLogger)
177         }
178         patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
179         if patchErr != nil {
180                 reqLogger.Error(patchErr, "Unable to process patch to prometheus")
181                 cleanUpExternalResources(instance)
182                 r.updateStatus("Error", instanceKey, "", "", "")
183                 return patchErr
184         }
185         r.updateStatus("Enabled", instanceKey, pName, remoteURL, id)
186         reqLogger.V(1).Info("Patch merged")
187
188         return nil
189 }
190
191 func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
192         prom := &monitoringv1.Prometheus{}
193         if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil {
194                 reqLogger.Error(err, "Error getting prometheus")
195                 return err
196         }
197         reqLogger.Info("Found prometheus to update")
198
199         var patch []byte
200         remoteURL, _, err := getAdapterInfo(instance)
201         if err != nil {
202                 reqLogger.Error(err, "Unable to get adapter info")
203                 return err
204         }
205
206         rws := prom.Spec.RemoteWrite
207         for i, spec := range rws {
208                 if spec.URL == remoteURL {
209                         reqLogger.Info("Found remote write to be removed, removing it")
210                         patch, _ = formPatch("remove", strconv.Itoa(i), remoteURL, instance, reqLogger)
211                         break
212                 }
213         }
214         patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
215         if patchErr != nil {
216                 reqLogger.Error(patchErr, "Unable to process patch to prometheus")
217                 return patchErr
218         }
219         reqLogger.V(1).Info("Patch merged, remote write removed")
220         cleanUpExternalResources(instance)
221         //remove Finalizer after deletion
222         if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
223                 if err := removeFinalizer(reqLogger, instance); err != nil {
224                         return err
225                 }
226                 err := r.client.Update(context.TODO(), instance)
227                 if err != nil {
228                         reqLogger.Error(err, "Unable to update instance")
229                         return err
230                 }
231         }
232         return nil
233 }
234
235 func addFinalizer(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
236         reqlogger.Info("Adding finalizer for the PrometheusRemoteEndpoint")
237         instance.SetFinalizers(append(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer))
238         return nil
239 }
240
241 func removeFinalizer(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
242         reqlogger.Info("Removing finalizer for the PrometheusRemoteEndpoint")
243         instance.SetFinalizers(remoteconfigutils.Remove(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer))
244         return nil
245 }
246
247 func formPatch(method string, index string, adapterURL string, instance *onapv1alpha1.PrometheusRemoteEndpoint, reqLogger logr.Logger) ([]byte, error) {
248         var err error
249         var mergePatch []byte
250         path := "/spec/remoteWrite/" + index
251         mergePatch, err = json.Marshal(map[string]interface{}{
252                 "op":   method,
253                 "path": path,
254                 "value": map[string]interface{}{
255                         "url":           adapterURL,
256                         "remoteTimeout": instance.Spec.RemoteTimeout,
257                 },
258         })
259         if err != nil {
260                 reqLogger.Error(err, "Unable to form patch")
261                 return nil, err
262         }
263         prependMergePatch := append([]byte{91}, mergePatch...)
264         finalMergePatch := append(prependMergePatch, 93)
265         return finalMergePatch, nil
266 }
267
268 func (r *ReconcilePrometheusRemoteEndpoint) updateStatus(status string, key types.NamespacedName, prom string, remoteURL string, kwid string) error {
269         // Fetch the CollectdGlobal instance
270         instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
271         err := r.client.Get(context.TODO(), key, instance)
272         if err != nil {
273                 return err
274         }
275         instance.Status.Status = status
276         instance.Status.PrometheusInstance = prom
277         instance.Status.KafkaWriterID = kwid
278         instance.Status.RemoteURL = remoteURL
279         err = r.client.Status().Update(context.TODO(), instance)
280         return err
281 }
282
283 func cleanUpExternalResources(instance *onapv1alpha1.PrometheusRemoteEndpoint) {
284         if instance.Spec.Type == "kafka" {
285                 deleteKafkaWriter(instance.Spec.AdapterURL + "/pkw/" + instance.Status.KafkaWriterID)
286         }
287 }
288
289 func getAdapterInfo(instance *onapv1alpha1.PrometheusRemoteEndpoint) (remoteURL string, id string, err error) {
290         switch strings.ToLower(instance.Spec.Type) {
291         case "m3db":
292                 return instance.Spec.AdapterURL + "/api/v1/prom/remote/write", "", nil
293         case "kafka":
294                 kwid, err := getKafkaWriter(instance)
295                 return instance.Spec.AdapterURL + "/pkw/" + kwid + "/receive", kwid, err
296         default:
297                 return instance.Spec.AdapterURL, "", nil
298         }
299 }
300
301 func deleteKafkaWriter(kwURL string) error {
302         client := &http.Client{}
303         req, err := http.NewRequest(http.MethodDelete, kwURL, nil)
304         if err != nil {
305                 log.Error(err, "Failed to form delete Kafka Writer request")
306                 return err
307         }
308         _, err = client.Do(req)
309         if err != nil {
310                 log.Error(err, "Failed to delete Kafka Writer", "Kafka Writer", kwURL)
311                 return err
312         }
313         return nil
314 }
315
316 func getKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
317         // TODO - check update events
318         if instance.Status.KafkaWriterID != "" {
319                 return instance.Status.KafkaWriterID, nil
320         }
321         return createKafkaWriter(instance)
322 }
323
324 func createKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
325
326         log.V(1).Info("Processing Kafka Remote Endpoint", "Kafka Writer Config", instance.Spec)
327         baseURL := instance.Spec.AdapterURL
328         kwc := instance.Spec.KafkaConfig
329         kwURL := baseURL + "/pkw"
330
331         postBody, err := json.Marshal(kwc)
332         if err != nil {
333                 log.Error(err, "JSON Marshalling error")
334                 return "", err
335         }
336
337         resp, err := http.Post(kwURL, "application/json", bytes.NewBuffer(postBody))
338         if err != nil {
339                 log.Error(err, "Failed to create Kafka Writer", "Kafka Writer", kwURL, "Kafka Writer Config", kwc)
340                 return "", err
341         }
342         defer resp.Body.Close()
343         var kwid string
344         json.NewDecoder(resp.Body).Decode(&kwid)
345         log.Info("Kafka Writer created", "Kafka Writer Id", kwid)
346
347         return kwid, err
348 }