Prometheus Kafka Writer Microservice
[demo.git] / vnfs / DAaaS / microservices / prom-kafka-writer / pkg / kafkawriter / kafkawriter.go
1 /*
2  *
3  * Copyright 2019 Intel Corporation.
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  *
14  */
15
16 package kafkawriter
17
18 import (
19         "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
20         "strconv"
21         "sync"
22 )
23
24 //KWConfig - serialized type for config related to Kafka
25 type KWConfig struct {
26         //Broker - Kafka Bootstrap servers (comma separated)
27         Broker string `json:"bootstrap.servers"`
28         //Topic - kafka topic name
29         Topic string `json:"topic"`
30         //UsePartition - Enforce use of partitions
31         UsePartition bool   `json:"usePartition"`
32         BatchMsgNum  int    `json:"batch.num.messages,omitempty"`
33         Compression  string `json:"compression.codec,omitempty"`
34 }
35
36 //KWProducer - holds the Kafka Config and associated Kafka Producer
37 type KWProducer struct {
38         Config   KWConfig
39         Producer *kafka.Producer
40 }
41
42 //KWRespMap packs the KWConfig and kwid for List Api
43 type KWRespMap map[string]KWConfig
44
45 //KWMap - Stores the Kafka Writer to Kafka Producer Mapping
46 //                This is used to uniquely identify a Kafka Writer - Producer mapping.
47 var (
48         KWMap   = make(map[string]KWProducer)
49         kwMutex sync.Mutex
50         id      int
51 )
52
53 // NewKafkaWriter - creates a new producer using kafka config.
54 // Handles the remote write from prometheus and send to kafka topic
55 func NewKafkaWriter(kwc *KWConfig) (*kafka.Producer, error) {
56         producer, err := kafka.NewProducer(&kafka.ConfigMap{
57                 "bootstrap.servers":   kwc.Broker,
58                 "compression.codec":   kwc.Compression,
59                 "batch.num.messages":  kwc.BatchMsgNum,
60                 "go.batch.producer":   true,
61                 "go.delivery.reports": false,
62         })
63         if err != nil {
64                 return nil, err
65         }
66         return producer, nil
67 }
68
69 //NewKWConfig - creates a KWConfig object with default values
70 func NewKWConfig() *KWConfig {
71         return &KWConfig{
72                 UsePartition: false,
73                 BatchMsgNum:  10000,
74                 Compression:  "none",
75         }
76 }
77
78 //NewKWRespMap - packs the KWConfig and kwid for List Api
79 func newKWRespMap() KWRespMap {
80         kwr := make(KWRespMap)
81         return kwr
82 }
83
84 //AddKWC - Method to add KafkaWriterConfig request to KWMap
85 func AddKWC(kwc *KWConfig) (string, error) {
86         kwMutex.Lock()
87         defer kwMutex.Unlock()
88         //TODO: Generate kwid
89         kwid := "pkw" + strconv.Itoa(id)
90         id++
91         producer, err := NewKafkaWriter(kwc)
92         if err != nil {
93                 log.Error("Error", err)
94                 id--
95                 return "", err
96         }
97
98         KWMap[kwid] = KWProducer{
99                 Config:   *kwc,
100                 Producer: producer,
101         }
102         return kwid, nil
103 }
104
105 //DeleteKWC - Method to add KafkaWriter request to KWMap
106 func DeleteKWC(kwid string) {
107         kwMutex.Lock()
108         defer kwMutex.Unlock()
109         if _, ok := KWMap[kwid]; ok {
110                 KWMap[kwid].Producer.Close()
111         }
112         delete(KWMap, kwid)
113 }
114
115 //ListKWC - Method to add KafkaWriter request to KWMap
116 func ListKWC() KWRespMap {
117         kwr := newKWRespMap()
118         for k, v := range KWMap {
119                 kwr[k] = v.Config
120         }
121         return kwr
122 }
123
124 //Cleanup - Method to cleanup resources
125 func Cleanup() {
126         for k := range KWMap {
127                 DeleteKWC(k)
128         }
129 }