1 package prometheusremoteendpoint
11 onapv1alpha1 "remote-config-operator/pkg/apis/onap/v1alpha1"
13 monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1"
14 logr "github.com/go-logr/logr"
15 corev1 "k8s.io/api/core/v1"
16 "k8s.io/apimachinery/pkg/api/errors"
17 "k8s.io/apimachinery/pkg/runtime"
18 "k8s.io/apimachinery/pkg/types"
19 remoteconfigutils "remote-config-operator/pkg/controller/utils"
20 "sigs.k8s.io/controller-runtime/pkg/client"
21 "sigs.k8s.io/controller-runtime/pkg/controller"
22 "sigs.k8s.io/controller-runtime/pkg/handler"
23 logf "sigs.k8s.io/controller-runtime/pkg/log"
24 "sigs.k8s.io/controller-runtime/pkg/manager"
25 "sigs.k8s.io/controller-runtime/pkg/reconcile"
26 "sigs.k8s.io/controller-runtime/pkg/source"
29 var log = logf.Log.WithName("controller_prometheusremoteendpoint")
31 // Add creates a new PrometheusRemoteEndpoint Controller and adds it to the Manager. The Manager will set fields on the Controller
32 // and Start it when the Manager is Started.
33 func Add(mgr manager.Manager) error {
34 return add(mgr, newReconciler(mgr))
37 // newReconciler returns a new reconcile.Reconciler
38 func newReconciler(mgr manager.Manager) reconcile.Reconciler {
39 return &ReconcilePrometheusRemoteEndpoint{client: mgr.GetClient(), scheme: mgr.GetScheme()}
42 // add adds a new Controller to mgr with r as the reconcile.Reconciler
43 func add(mgr manager.Manager, r reconcile.Reconciler) error {
44 // Create a new controller
45 c, err := controller.New("prometheusremoteendpoint-controller", mgr, controller.Options{Reconciler: r})
50 // Watch for changes to primary resource PrometheusRemoteEndpoint
52 log.V(1).Info("Add watcher for primary resource PrometheusRemoteEndpoint")
53 err = c.Watch(&source.Kind{Type: &onapv1alpha1.PrometheusRemoteEndpoint{}}, &handler.EnqueueRequestForObject{})
58 log.V(1).Info("Add watcher for secondary resource RemoteFilterAction")
59 // TODO(user): Modify this to be the types you create that are owned by the primary resource
60 // Watch for changes to secondary resource Pods and requeue the owner PrometheusRemoteEndpoint
61 err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForOwner{
63 OwnerType: &onapv1alpha1.PrometheusRemoteEndpoint{},
66 log.Error(err, "Error enqueuing requests due to remoteFilterAction changes")
70 log.Info("Enqueued reconcile requests due to remoteFilterAction changes")
74 // blank assignment to verify that ReconcilePrometheusRemoteEndpoint implements reconcile.Reconciler
75 var _ reconcile.Reconciler = &ReconcilePrometheusRemoteEndpoint{}
77 // ReconcilePrometheusRemoteEndpoint reconciles a PrometheusRemoteEndpoint object
78 type ReconcilePrometheusRemoteEndpoint struct {
79 // This client, initialized using mgr.Client() above, is a split client
80 // that reads objects from the cache and writes to the apiserver
82 scheme *runtime.Scheme
85 // Reconcile reads that state of the cluster for a PrometheusRemoteEndpoint object
86 // and makes changes based on the state read and what is in the PrometheusRemoteEndpoint.Spec
88 // The Controller will requeue the Request to be processed again if the returned error is non-nil or
89 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
90 func (r *ReconcilePrometheusRemoteEndpoint) Reconcile(request reconcile.Request) (reconcile.Result, error) {
91 reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
92 reqLogger.Info("Reconciling PrometheusRemoteEndpoint")
94 // Fetch the PrometheusRemoteEndpoint instance
95 instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
96 err := r.client.Get(context.TODO(), request.NamespacedName, instance)
98 if errors.IsNotFound(err) {
99 // Request object not found, could have been deleted after reconcile request.
100 // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
101 // Return and don't requeue
102 reqLogger.Error(err, "PrometheusRemoteEndpoint object not found")
103 return reconcile.Result{}, nil
105 // Error reading the object - requeue the request.
106 reqLogger.Error(err, "Error reading PrometheusRemoteEndpoint object, Requeing ")
107 return reconcile.Result{}, err
110 // Check if CR is being Deleted
111 if instance.GetDeletionTimestamp() != nil {
112 //Delete Remote write
113 if err := r.processDeletionRequest(reqLogger, instance); err != nil {
114 reqLogger.Error(err, "Error processing deletion request")
115 return reconcile.Result{}, err
117 return reconcile.Result{}, nil
120 //Add finalizer for the CR object
121 if !remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
122 reqLogger.Info("Adding finalizer for PrometheusRemoteEndpoint")
123 if err := addFinalizer(reqLogger, instance); err != nil {
124 return reconcile.Result{}, err
126 err := r.client.Update(context.TODO(), instance)
128 reqLogger.Error(err, "Unable to update instance")
129 return reconcile.Result{}, err
131 return reconcile.Result{}, nil
134 if err := r.processPatchRequest(reqLogger, instance); err != nil {
135 reqLogger.Error(err, "Error processing request")
136 return reconcile.Result{}, err
138 return reconcile.Result{}, nil
141 func (r *ReconcilePrometheusRemoteEndpoint) processPatchRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
143 prom := &monitoringv1.Prometheus{}
144 pName := instance.ObjectMeta.Labels["app"]
145 if err1 := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: pName}, prom); err1 != nil {
146 reqLogger.Error(err1, "Error getting prometheus")
149 reqLogger.Info("Found prometheus to update")
153 rws := prom.Spec.RemoteWrite
154 remoteURL, id, err := getAdapterInfo(instance)
155 instanceKey := types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}
157 reqLogger.Error(err, "Unable to get adapter url")
162 for i, spec := range rws {
163 // Update event - check the prometheus remote write Url against remoteURL in the Status
164 // to consider the case when URL itself is updated.
165 if spec.URL == instance.Status.RemoteURL {
166 reqLogger.Info("Remote write already exists, updating it")
167 patch, _ = formPatch("replace", strconv.Itoa(i), remoteURL, instance, reqLogger)
174 reqLogger.Info("Remote write does not exist, creating one...")
175 // rwsLength := len(rws)
176 patch, _ = formPatch("add", "-", remoteURL, instance, reqLogger)
178 patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
180 reqLogger.Error(patchErr, "Unable to process patch to prometheus")
181 cleanUpExternalResources(instance)
182 r.updateStatus("Error", instanceKey, "", "", "")
185 r.updateStatus("Enabled", instanceKey, pName, remoteURL, id)
186 reqLogger.V(1).Info("Patch merged")
191 func (r *ReconcilePrometheusRemoteEndpoint) processDeletionRequest(reqLogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
192 prom := &monitoringv1.Prometheus{}
193 if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.ObjectMeta.Labels["app"]}, prom); err != nil {
194 reqLogger.Error(err, "Error getting prometheus")
197 reqLogger.Info("Found prometheus to update")
200 remoteURL, _, err := getAdapterInfo(instance)
202 reqLogger.Error(err, "Unable to get adapter info")
206 rws := prom.Spec.RemoteWrite
207 for i, spec := range rws {
208 if spec.URL == remoteURL {
209 reqLogger.Info("Found remote write to be removed, removing it")
210 patch, _ = formPatch("remove", strconv.Itoa(i), remoteURL, instance, reqLogger)
214 patchErr := r.client.Patch(context.TODO(), prom, client.ConstantPatch(types.JSONPatchType, patch))
216 reqLogger.Error(patchErr, "Unable to process patch to prometheus")
219 reqLogger.V(1).Info("Patch merged, remote write removed")
220 cleanUpExternalResources(instance)
221 //remove Finalizer after deletion
222 if remoteconfigutils.Contains(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer) {
223 if err := removeFinalizer(reqLogger, instance); err != nil {
226 err := r.client.Update(context.TODO(), instance)
228 reqLogger.Error(err, "Unable to update instance")
235 func addFinalizer(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
236 reqlogger.Info("Adding finalizer for the PrometheusRemoteEndpoint")
237 instance.SetFinalizers(append(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer))
241 func removeFinalizer(reqlogger logr.Logger, instance *onapv1alpha1.PrometheusRemoteEndpoint) error {
242 reqlogger.Info("Removing finalizer for the PrometheusRemoteEndpoint")
243 instance.SetFinalizers(remoteconfigutils.Remove(instance.GetFinalizers(), remoteconfigutils.RemoteConfigFinalizer))
247 func formPatch(method string, index string, adapterURL string, instance *onapv1alpha1.PrometheusRemoteEndpoint, reqLogger logr.Logger) ([]byte, error) {
249 var mergePatch []byte
250 path := "/spec/remoteWrite/" + index
251 mergePatch, err = json.Marshal(map[string]interface{}{
254 "value": map[string]interface{}{
256 "remoteTimeout": instance.Spec.RemoteTimeout,
260 reqLogger.Error(err, "Unable to form patch")
263 prependMergePatch := append([]byte{91}, mergePatch...)
264 finalMergePatch := append(prependMergePatch, 93)
265 return finalMergePatch, nil
268 func (r *ReconcilePrometheusRemoteEndpoint) updateStatus(status string, key types.NamespacedName, prom string, remoteURL string, kwid string) error {
269 // Fetch the CollectdGlobal instance
270 instance := &onapv1alpha1.PrometheusRemoteEndpoint{}
271 err := r.client.Get(context.TODO(), key, instance)
275 instance.Status.Status = status
276 instance.Status.PrometheusInstance = prom
277 instance.Status.KafkaWriterID = kwid
278 instance.Status.RemoteURL = remoteURL
279 err = r.client.Status().Update(context.TODO(), instance)
283 func cleanUpExternalResources(instance *onapv1alpha1.PrometheusRemoteEndpoint) {
284 if instance.Spec.Type == "kafka" {
285 deleteKafkaWriter(instance.Spec.AdapterURL + "/pkw/" + instance.Status.KafkaWriterID)
289 func getAdapterInfo(instance *onapv1alpha1.PrometheusRemoteEndpoint) (remoteURL string, id string, err error) {
290 switch strings.ToLower(instance.Spec.Type) {
292 return instance.Spec.AdapterURL + "/api/v1/prom/remote/write", "", nil
294 kwid, err := getKafkaWriter(instance)
295 return instance.Spec.AdapterURL + "/pkw/" + kwid + "/receive", kwid, err
297 return instance.Spec.AdapterURL, "", nil
301 func deleteKafkaWriter(kwURL string) error {
302 client := &http.Client{}
303 req, err := http.NewRequest(http.MethodDelete, kwURL, nil)
305 log.Error(err, "Failed to form delete Kafka Writer request")
308 _, err = client.Do(req)
310 log.Error(err, "Failed to delete Kafka Writer", "Kafka Writer", kwURL)
316 func getKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
317 // TODO - check update events
318 if instance.Status.KafkaWriterID != "" {
319 return instance.Status.KafkaWriterID, nil
321 return createKafkaWriter(instance)
324 func createKafkaWriter(instance *onapv1alpha1.PrometheusRemoteEndpoint) (string, error) {
326 log.V(1).Info("Processing Kafka Remote Endpoint", "Kafka Writer Config", instance.Spec)
327 baseURL := instance.Spec.AdapterURL
328 kwc := instance.Spec.KafkaConfig
329 kwURL := baseURL + "/pkw"
331 postBody, err := json.Marshal(kwc)
333 log.Error(err, "JSON Marshalling error")
337 resp, err := http.Post(kwURL, "application/json", bytes.NewBuffer(postBody))
339 log.Error(err, "Failed to create Kafka Writer", "Kafka Writer", kwURL, "Kafka Writer Config", kwc)
342 defer resp.Body.Close()
344 json.NewDecoder(resp.Body).Decode(&kwid)
345 log.Info("Kafka Writer created", "Kafka Writer Id", kwid)