3 * Copyright 2019 Intel Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
25 logger "prom-kafka-writer/pkg/config"
26 kw "prom-kafka-writer/pkg/kafkawriter"
28 "github.com/golang/protobuf/proto"
29 "github.com/golang/snappy"
30 "github.com/gorilla/mux"
31 "github.com/prometheus/prometheus/prompb"
34 type kwResponse struct {
35 KWid string `json:"kwid,omitempty"`
36 KWCRespMap kw.KWRespMap `json:"kafkaWriterConfigs,omitempty"`
39 var log = logger.GetLoggerInstance()
41 // CreateKWHandler - Creates and starts a Prometheus to Kafka writer
42 func CreateKWHandler(w http.ResponseWriter, r *http.Request) {
43 log.Infow("Received request for Creating Kafka Writer")
44 kwConfig := kw.NewKWConfig()
45 dec := json.NewDecoder(r.Body)
46 dec.DisallowUnknownFields()
47 err := dec.Decode(kwConfig)
50 http.Error(w, "Body empty", http.StatusBadRequest)
53 http.Error(w, err.Error(), http.StatusUnprocessableEntity)
56 kwid, err := kw.AddKWC(kwConfig)
58 http.Error(w, err.Error(), http.StatusInternalServerError)
63 w.Header().Set("Content-Type", "application/json")
64 w.WriteHeader(http.StatusCreated)
68 err = json.NewEncoder(w).Encode(kwResp)
70 http.Error(w, err.Error(), http.StatusInternalServerError)
75 // ListKWHandler - Lists the KafkaWriters and its config
76 func ListKWHandler(w http.ResponseWriter, r *http.Request) {
77 log.Infow("Received request for List Kafka Writers", "url", r.URL)
80 w.Header().Set("Content-Type", "application/json")
81 w.WriteHeader(http.StatusOK)
85 err := json.NewEncoder(w).Encode(kwResp)
87 http.Error(w, err.Error(), http.StatusInternalServerError)
92 // DeleteKWHandler - Deletes a given Prometheus to Kafka writer
93 func DeleteKWHandler(w http.ResponseWriter, r *http.Request) {
95 log.Infow("Received request for Deleting Kafka Writer", "KWID", params["kwid"])
96 kw.DeleteKWC(params["kwid"])
99 w.Header().Set("Content-Type", "application/json")
100 w.WriteHeader(http.StatusOK)
103 // ReceiveKWHandler - Publish metrics from Prometheus to Kafka
104 func ReceiveKWHandler(w http.ResponseWriter, r *http.Request) {
105 params := mux.Vars(r)
106 kwid := params["kwid"]
107 if _, ok := kw.KWMap[kwid]; !ok {
108 notRegisteredErr := errors.New("kafka writer not registered").Error()
109 log.Error(notRegisteredErr)
110 http.Error(w, notRegisteredErr, http.StatusNotFound)
113 log.Infow("Produce message on Kafka Writer", "kwid", kwid)
115 compressed, err := ioutil.ReadAll(r.Body)
118 log.Error("error", err.Error())
119 http.Error(w, err.Error(), http.StatusInternalServerError)
123 metricBuf, err := snappy.Decode(nil, compressed)
125 log.Error("error", err.Error())
126 http.Error(w, err.Error(), http.StatusBadRequest)
130 var metrics prompb.WriteRequest
131 if err := proto.Unmarshal(metricBuf, &metrics); err != nil {
132 log.Error("error", err.Error())
133 http.Error(w, err.Error(), http.StatusBadRequest)
137 err = kw.PublishTimeSeries(kwid, &metrics)
139 log.Error("error", err.Error())
140 http.Error(w, err.Error(), http.StatusInternalServerError)