2d78a43fab64c8b4252c19199d954db9f7c30ea3
[demo.git] / vnfs / DAaaS / microservices / remote-config-operator / pkg / controller / prometheusremoteendpoint / prometheusremoteendpoint_controller.go
1 package prometheusremoteendpoint
2
3 import (
4         "context"
5         "encoding/json"
6         "strconv"
7
8         onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1"
9
10         monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
11         logr "github.com/go-logr/logr"
12         corev1 "k8s.io/api/core/v1"
13         "k8s.io/apimachinery/pkg/api/errors"
14         "k8s.io/apimachinery/pkg/runtime"
15         "k8s.io/apimachinery/pkg/types"
16         remoteconfigutils "remote-config-operator/pkg/controller/utils"
17         "sigs.k8s.io/controller-runtime/pkg/client"
18         "sigs.k8s.io/controller-runtime/pkg/controller"
19         "sigs.k8s.io/controller-runtime/pkg/handler"
20         logf "sigs.k8s.io/controller-runtime/pkg/log"
21         "sigs.k8s.io/controller-runtime/pkg/manager"
22         "sigs.k8s.io/controller-runtime/pkg/reconcile"
23         "sigs.k8s.io/controller-runtime/pkg/source"
24 )
25
26 var log = logf.Log.WithName("controller_prometheusremoteendpoint")
27
28 // Add creates a new PrometheusRemoteEndpoint Controller and adds it to the Manager. The Manager will set fields on the Controller
29 // and Start it when the Manager is Started.
30 func Add(mgr manager.Manager) error {
31         return add(mgr, newReconciler(mgr))
32 }
33
34 // newReconciler returns a new reconcile.Reconciler
35 func newReconciler(mgr manager.Manager) reconcile.Reconciler {
36         return &ReconcilePrometheusRemoteEndpoint{client: mgr.GetClient(), scheme: mgr.GetScheme()}
37 }
38
39 // add adds a new Controller to mgr with r as the reconcile.Reconciler
40 func add(mgr manager.Manager, r reconcile.Reconciler) error {
41         // Create a new controller
42         c, err := controller.New("prometheusremoteendpoint-controller", mgr, controller.Options{Reconciler: r})
43         if err != nil {
44                 return err
45         }
46
47         // Watch for changes to primary resource PrometheusRemoteEndpoint
48
49         log.V(1).Info("Add watcher for primary resource PrometheusRemoteEndpoint")
50         err = c.Watch(&source.Kind{Type: &onapv1alpha1.PrometheusRemoteEndpoint{}}, &handler.EnqueueRequestForObject{})
51         if err != nil {
52                 return err
53         }
54
55         log.V(1).Info("Add watcher for secondary resource RemoteFilterAction")
56         // TODO(user): Modify this to be the types you create that are owned by the primary resource
57         // Watch for changes to secondary resource Pods and requeue the owner PrometheusRemoteEndpoint
58         err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
59                 IsController: true,
60                 OwnerType:    &onapv1alpha1.PrometheusRemoteEndpoint{},
61         })
62         if err != nil {
63                 log.Error(err, "Error enqueuing requests due to remoteFilterAction changes")
64                 return err
65         }
66
67         log.Info("Enqueued reconcile requests due to remoteFilterAction changes")
68         return nil
69 }
70
71 // blank assignment to verify that ReconcilePrometheusRemoteEndpoint implements reconcile.Reconciler
72 var _ reconcile.Reconciler = &ReconcilePrometheusRemoteEndpoint{}
73
74 // ReconcilePrometheusRemoteEndpoint reconciles a PrometheusRemoteEndpoint object
75 type ReconcilePrometheusRemoteEndpoint struct {
76         // This client, initialized using mgr.Client() above, is a split client
77         // that reads objects from the cache and writes to the apiserver
78         client client.Client
79         scheme *runtime.Scheme
80 }
81
82 // Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object and makes changes based on the state read
83 // and what is in the PrometheusRemoteEndpoint.Spec
84
85 // Note:
86 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
87 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
88 func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request) (reconcile.Result, error) {
89         reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
90         reqLogger.Info("Reconciling PrometheusRemoteEndpoint")
91
92         // Fetch the PrometheusRemoteEndpoint instance
93         instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
94         err := r.client.Get(context.TODO(), request.NamespacedName, instance)
95         if err != nil {
96                 if errors.IsNotFound(err) {
97                         // Request object not found, could have been deleted after reconcile request.
98                         // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
99                         // Return and don't requeue
100                         reqLogger.Error(err, "PrometheusRemoteEndpoint object not found")
101                         return reconcile.Result{}, nil
102                 }
103                 // Error reading the object - requeue the request.
104                 reqLogger.Error(err, "Error reading PrometheusRemoteEndpoint object, Requeing ")
105                 return reconcile.Result{}, err
106         }
107
108         isBeingDeleted := checkDeletionTimestamp(reqLogger, instance)
109         if isBeingDeleted {
110                 //Delete Remote write
111                 if err := r.processDeletionRequest(reqLogger, instance); err != nil {
112                         reqLogger.Error(err, "Error processing deletion request")
113                         return reconcile.Result{}, err
114                 }
115                 return reconcile.Result{}, nil
116         }
117
118         //Add finalizer for the CR object
119         if !remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
120                 reqLogger.Info("Adding finalizer for PrometheusRemoteEndpoint")
121                 if err := addFinalizer(reqLogger, instance); err != nil {
122                         return reconcile.Result{}, err
123                 }
124                 err := r.client.Update(context.TODO(), instance)
125                 if err != nil {
126                         reqLogger.Error(err, "Unable to update instance")
127                         return reconcile.Result{}, err
128                 }
129                 return reconcile.Result{}, nil
130         }
131
132         if err := r.processPatchRequest(reqLogger, instance); err != nil {
133                 reqLogger.Error(err, "Error processing request")
134                 return reconcile.Result{}, err
135         }
136         return reconcile.Result{}, nil
137 }
138
139 func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
140
141         prom := &monitoringv1.Prometheus{}
142         if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err1 != nil {
143                 reqLogger.Error(err1, "Error getting prometheus")
144                 return err1
145         }
146         reqLogger.Info("Found prometheus to update")
147
148         var patch []byte
149
150         rws := prom.Spec.RemoteWrite
151         adapterURL := instance.Spec.AdapterUrl
152
153         isUpdate := false
154         for i, spec := range rws {
155                 if spec.URL == instance.Spec.AdapterUrl {
156                         reqLogger.Info("Remote write already exists, updating it")
157                         patch, _ = formPatch("replace", strconv.Itoa(i), adapterURL, instance, reqLogger)
158                         isUpdate = true
159                         break
160                 }
161         }
162
163         if !isUpdate {
164                 reqLogger.Info("Remote write does not exist, creating one...")
165                 // rwsLength := len(rws)
166                 patch, _ = formPatch("add", "-", adapterURL, instance, reqLogger)
167         }
168         patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
169         if patchErr != nil {
170                 reqLogger.Error(patchErr, "Unable to process patch to prometheus")
171                 return patchErr
172         }
173         reqLogger.Info("Patch merged")
174
175         return nil
176 }
177
178 func checkDeletionTimestamp(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) bool {
179         isMarkedForDeletion := instance.GetDeletionTimestamp() != nil
180         return isMarkedForDeletion
181 }
182
183 func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
184         prom := &monitoringv1.Prometheus{}
185         if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil {
186                 reqLogger.Error(err, "Error getting prometheus")
187                 return err
188         }
189         reqLogger.Info("Found prometheus to update")
190
191         var patch []byte
192         adapterURL := instance.Spec.AdapterUrl
193
194         rws := prom.Spec.RemoteWrite
195         for i, spec := range rws {
196                 if spec.URL == instance.Spec.AdapterUrl {
197                         reqLogger.Info("Found remote write to be removed, removing it")
198                         patch, _ = formPatch("remove", strconv.Itoa(i), adapterURL, instance, reqLogger)
199                         break
200                 }
201         }
202         patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
203         if patchErr != nil {
204                 reqLogger.Error(patchErr, "Unable to process patch to prometheus")
205                 return patchErr
206         }
207         reqLogger.Info("Patch merged, remote write removed")
208
209         //remove Finalizer after deletion
210         if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
211                 if err := removeFinalizer(reqLogger, instance); err != nil {
212                         return err
213                 }
214                 err := r.client.Update(context.TODO(), instance)
215                 if err != nil {
216                         reqLogger.Error(err, "Unable to update instance")
217                         return err
218                 }
219         }
220         return nil
221 }
222
223 func addFinalizer(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
224         reqlogger.Info("Adding finalizer for the PrometheusRemoteEndpoint")
225         instance.SetFinalizers(append(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer))
226         return nil
227 }
228
229 func removeFinalizer(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
230         reqlogger.Info("Removing finalizer for the PrometheusRemoteEndpoint")
231         instance.SetFinalizers(remoteconfigutils.Remove(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer))
232         return nil
233 }
234
235 func formPatch(method string, index string, adapterURL string, instance *onapv1alpha1.PrometheusRemoteEndpoint, reqLogger logr.Logger) ([]byte, error) {
236         var err error
237         var mergePatch []byte
238         path := "/spec/remoteWrite/" + index
239         mergePatch, err = json.Marshal(map[string]interface{}{
240                 "op":   method,
241                 "path": path,
242                 "value": map[string]interface{}{
243                         "url":           adapterURL,
244                         "remoteTimeout": instance.Spec.RemoteTimeout,
245                 },
246         })
247         if err != nil {
248                 reqLogger.Error(err, "Unable to form patch")
249                 return nil, err
250         }
251         prependMergePatch := append([]byte{91}, mergePatch...)
252         finalMergePatch := append(prependMergePatch, 93)
253         return finalMergePatch, nil
254 }