Prometheus Kafka Writer Microservice
[demo.git] / vnfs / DAaaS / microservices / prom-kafka-writer / pkg / kafkawriter / producer.go
1 /*
2  *
3  * Copyright 2019 Intel Corporation.
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  *
14  */
15
16 package kafkawriter
17
18 import (
19         "encoding/json"
20         "github.com/prometheus/common/model"
21         "github.com/prometheus/prometheus/prompb"
22         "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
23         logger "prom-kafka-writer/pkg/config"
24 )
25
26 var log = logger.GetLoggerInstance()
27
28 func PublishTimeSeries(kwid string, metrics *prompb.WriteRequest) error {
29         log.Debugw("Remote write Time Series", "length", len(metrics.Timeseries), "TimeSeries", metrics.Timeseries)
30         for _, ts := range metrics.Timeseries {
31                 m := make(model.Metric, len(ts.Labels))
32                 for _, l := range ts.Labels {
33                         m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
34                 }
35                 log.Debugw("Labels", "Labelset", m)
36
37                 for _, s := range ts.Samples {
38                         log.Debugf("  %f %d\n", s.Value, s.Timestamp)
39                         metric := map[string]interface{}{
40                                 "name":      m["__name__"],
41                                 "labels":    m,
42                                 "timestamp": s.Timestamp,
43                                 "value":     s.Value,
44                         }
45                         key := string(m["__name__"])
46                         jsonMetric, err := json.Marshal(metric)
47                         if err != nil {
48                                 log.Errorw("Marshal error", "error", err.Error())
49                                 continue
50                         }
51                         err = publish(kwid, key, jsonMetric)
52                         if err != nil {
53                                 log.Error("Failed to produce message")
54                                 return err
55                         }
56                 }
57         }
58         return nil
59 }
60
61 func publish(kwid string, key string, jsonMetric []byte) error {
62         var (
63                 kwp = KWMap[kwid].Producer
64                 kwc = KWMap[kwid].Config
65         )
66
67         tp := getTopicPartition(kwc)
68         kwMsg := kafka.Message{TopicPartition: tp, Key: []byte(key), Value: jsonMetric}
69         err := kwp.Produce(&kwMsg, nil)
70         if err != nil {
71                 log.Errorw("Kafka Producer Error", "error", err.Error())
72         }
73         return err
74 }
75
76 func getTopicPartition(kwc KWConfig) kafka.TopicPartition {
77         p := kafka.PartitionAny
78         if kwc.UsePartition {
79                 // TODO: Implement partition strategy
80                 p = kafka.PartitionAny
81         }
82         return kafka.TopicPartition{Topic: &kwc.Topic, Partition: p}
83 }