Prometheus Kafka Writer Microservice
[demo.git] / vnfs / DAaaS / microservices / prom-kafka-writer / pkg / api / handler.go
1 /*
2  *
3  * Copyright 2019 Intel Corporation.
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  *
14  */
15
16 package api
17
18 import (
19         "encoding/json"
20         "errors"
21         "io"
22         "io/ioutil"
23         "net/http"
24
25         logger "prom-kafka-writer/pkg/config"
26         kw "prom-kafka-writer/pkg/kafkawriter"
27
28         "github.com/golang/protobuf/proto"
29         "github.com/golang/snappy"
30         "github.com/gorilla/mux"
31         "github.com/prometheus/prometheus/prompb"
32 )
33
34 type kwResponse struct {
35         KWid       string       `json:"kwid,omitempty"`
36         KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"`
37 }
38
39 var log = logger.GetLoggerInstance()
40
41 // CreateKWHandler - Creates and starts a Prometheus to Kafka writer
42 func CreateKWHandler(w http.ResponseWriter, r *http.Request) {
43         log.Infow("Received request for Creating Kafka Writer")
44         kwConfig := kw.NewKWConfig()
45         dec := json.NewDecoder(r.Body)
46         dec.DisallowUnknownFields()
47         err := dec.Decode(kwConfig)
48         switch {
49         case err == io.EOF:
50                 http.Error(w, "Body empty", http.StatusBadRequest)
51                 return
52         case err != nil:
53                 http.Error(w, err.Error(), http.StatusUnprocessableEntity)
54                 return
55         }
56         kwid, err := kw.AddKWC(kwConfig)
57         if err != nil {
58                 http.Error(w, err.Error(), http.StatusInternalServerError)
59                 return
60         }
61
62         //Send response back
63         w.Header().Set("Content-Type", "application/json")
64         w.WriteHeader(http.StatusCreated)
65         kwResp := kwResponse{
66                 KWid: kwid,
67         }
68         err = json.NewEncoder(w).Encode(kwResp)
69         if err != nil {
70                 http.Error(w, err.Error(), http.StatusInternalServerError)
71                 return
72         }
73 }
74
75 // ListKWHandler - Lists the KafkaWriters and its config
76 func ListKWHandler(w http.ResponseWriter, r *http.Request) {
77         log.Infow("Received request for List Kafka Writers", "url", r.URL)
78         res := kw.ListKWC()
79         //Send response back
80         w.Header().Set("Content-Type", "application/json")
81         w.WriteHeader(http.StatusOK)
82         kwResp := kwResponse{
83                 KWCRespMap: res,
84         }
85         err := json.NewEncoder(w).Encode(kwResp)
86         if err != nil {
87                 http.Error(w, err.Error(), http.StatusInternalServerError)
88                 return
89         }
90 }
91
92 // DeleteKWHandler - Deletes a given Prometheus to Kafka writer
93 func DeleteKWHandler(w http.ResponseWriter, r *http.Request) {
94         params := mux.Vars(r)
95         log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"])
96         kw.DeleteKWC(params["kwid"])
97
98         //Send response back
99         w.Header().Set("Content-Type", "application/json")
100         w.WriteHeader(http.StatusOK)
101 }
102
103 // ReceiveKWHandler - Publish metrics from Prometheus to Kafka
104 func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) {
105         params := mux.Vars(r)
106         kwid := params["kwid"]
107         if _, ok := kw.KWMap[kwid]; !ok {
108                 notRegisteredErr := errors.New("kafka writer not registered").Error()
109                 log.Error(notRegisteredErr)
110                 http.Error(w, notRegisteredErr, http.StatusNotFound)
111                 return
112         }
113         log.Infow("Produce message on Kafka Writer", "kwid", kwid)
114
115         compressed, err := ioutil.ReadAll(r.Body)
116         defer r.Body.Close()
117         if err != nil {
118                 log.Error("error", err.Error())
119                 http.Error(w, err.Error(), http.StatusInternalServerError)
120                 return
121         }
122
123         metricBuf, err := snappy.Decode(nil, compressed)
124         if err != nil {
125                 log.Error("error", err.Error())
126                 http.Error(w, err.Error(), http.StatusBadRequest)
127                 return
128         }
129
130         var metrics prompb.WriteRequest
131         if err := proto.Unmarshal(metricBuf, &metrics); err != nil {
132                 log.Error("error", err.Error())
133                 http.Error(w, err.Error(), http.StatusBadRequest)
134                 return
135         }
136
137         err = kw.PublishTimeSeries(kwid, &metrics)
138         if err != nil {
139                 log.Error("error", err.Error())
140                 http.Error(w, err.Error(), http.StatusInternalServerError)
141                 return
142         }
143 }