Prometheus Kafka Writer Microservice
[demo.git] / vnfs / DAaaS / microservices / prom-kafka-writer / pkg / api / handler.go
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/api/handler.go
new file mode 100644 (file)
index 0000000..d7a2b89
--- /dev/null
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package api
+
+import (
+       "encoding/json"
+       "errors"
+       "io"
+       "io/ioutil"
+       "net/http"
+
+       logger "prom-kafka-writer/pkg/config"
+       kw "prom-kafka-writer/pkg/kafkawriter"
+
+       "github.com/golang/protobuf/proto"
+       "github.com/golang/snappy"
+       "github.com/gorilla/mux"
+       "github.com/prometheus/prometheus/prompb"
+)
+
+type kwResponse struct {
+       KWid       string       `json:"kwid,omitempty"`
+       KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"`
+}
+
+var log = logger.GetLoggerInstance()
+
+// CreateKWHandler - Creates and starts a Prometheus to Kafka writer
+func CreateKWHandler(w http.ResponseWriter, r *http.Request) {
+       log.Infow("Received request for Creating Kafka Writer")
+       kwConfig := kw.NewKWConfig()
+       dec := json.NewDecoder(r.Body)
+       dec.DisallowUnknownFields()
+       err := dec.Decode(kwConfig)
+       switch {
+       case err == io.EOF:
+               http.Error(w, "Body empty", http.StatusBadRequest)
+               return
+       case err != nil:
+               http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+               return
+       }
+       kwid, err := kw.AddKWC(kwConfig)
+       if err != nil {
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       //Send response back
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusCreated)
+       kwResp := kwResponse{
+               KWid: kwid,
+       }
+       err = json.NewEncoder(w).Encode(kwResp)
+       if err != nil {
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+}
+
+// ListKWHandler - Lists the KafkaWriters and its config
+func ListKWHandler(w http.ResponseWriter, r *http.Request) {
+       log.Infow("Received request for List Kafka Writers", "url", r.URL)
+       res := kw.ListKWC()
+       //Send response back
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       kwResp := kwResponse{
+               KWCRespMap: res,
+       }
+       err := json.NewEncoder(w).Encode(kwResp)
+       if err != nil {
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+}
+
+// DeleteKWHandler - Deletes a given Prometheus to Kafka writer
+func DeleteKWHandler(w http.ResponseWriter, r *http.Request) {
+       params := mux.Vars(r)
+       log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"])
+       kw.DeleteKWC(params["kwid"])
+
+       //Send response back
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+}
+
+// ReceiveKWHandler - Publish metrics from Prometheus to Kafka
+func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) {
+       params := mux.Vars(r)
+       kwid := params["kwid"]
+       if _, ok := kw.KWMap[kwid]; !ok {
+               notRegisteredErr := errors.New("kafka writer not registered").Error()
+               log.Error(notRegisteredErr)
+               http.Error(w, notRegisteredErr, http.StatusNotFound)
+               return
+       }
+       log.Infow("Produce message on Kafka Writer", "kwid", kwid)
+
+       compressed, err := ioutil.ReadAll(r.Body)
+       defer r.Body.Close()
+       if err != nil {
+               log.Error("error", err.Error())
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       metricBuf, err := snappy.Decode(nil, compressed)
+       if err != nil {
+               log.Error("error", err.Error())
+               http.Error(w, err.Error(), http.StatusBadRequest)
+               return
+       }
+
+       var metrics prompb.WriteRequest
+       if err := proto.Unmarshal(metricBuf, &metrics); err != nil {
+               log.Error("error", err.Error())
+               http.Error(w, err.Error(), http.StatusBadRequest)
+               return
+       }
+
+       err = kw.PublishTimeSeries(kwid, &metrics)
+       if err != nil {
+               log.Error("error", err.Error())
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+}