Prometheus Kafka Writer Microservice
[demo.git] / vnfs / DAaaS / microservices / prom-kafka-writer / pkg / kafkawriter / kafkawriter.go
diff --git a/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go b/vnfs/DAaaS/microservices/prom-kafka-writer/pkg/kafkawriter/kafkawriter.go
new file mode 100644 (file)
index 0000000..f56f66a
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright 2019 Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package kafkawriter
+
+import (
+       "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
+       "strconv"
+       "sync"
+)
+
+//KWConfig - serialized type for config related to Kafka
+type KWConfig struct {
+       //Broker - Kafka Bootstrap servers (comma separated)
+       Broker string `json:"bootstrap.servers"`
+       //Topic - kafka topic name
+       Topic string `json:"topic"`
+       //UsePartition - Enforce use of partitions
+       UsePartition bool   `json:"usePartition"`
+       BatchMsgNum  int    `json:"batch.num.messages,omitempty"`
+       Compression  string `json:"compression.codec,omitempty"`
+}
+
+//KWProducer - holds the Kafka Config and associated Kafka Producer
+type KWProducer struct {
+       Config   KWConfig
+       Producer *kafka.Producer
+}
+
+//KWRespMap packs the KWConfig and kwid for List Api
+type KWRespMap map[string]KWConfig
+
+//KWMap - Stores the Kafka Writer to Kafka Producer Mapping
+//               This is used to uniquely identify a Kafka Writer - Producer mapping.
+var (
+       KWMap   = make(map[string]KWProducer)
+       kwMutex sync.Mutex
+       id      int
+)
+
+// NewKafkaWriter - creates a new producer using kafka config.
+// Handles the remote write from prometheus and send to kafka topic
+func NewKafkaWriter(kwc *KWConfig) (*kafka.Producer, error) {
+       producer, err := kafka.NewProducer(&kafka.ConfigMap{
+               "bootstrap.servers":   kwc.Broker,
+               "compression.codec":   kwc.Compression,
+               "batch.num.messages":  kwc.BatchMsgNum,
+               "go.batch.producer":   true,
+               "go.delivery.reports": false,
+       })
+       if err != nil {
+               return nil, err
+       }
+       return producer, nil
+}
+
+//NewKWConfig - creates a KWConfig object with default values
+func NewKWConfig() *KWConfig {
+       return &KWConfig{
+               UsePartition: false,
+               BatchMsgNum:  10000,
+               Compression:  "none",
+       }
+}
+
+//NewKWRespMap - packs the KWConfig and kwid for List Api
+func newKWRespMap() KWRespMap {
+       kwr := make(KWRespMap)
+       return kwr
+}
+
+//AddKWC - Method to add KafkaWriterConfig request to KWMap
+func AddKWC(kwc *KWConfig) (string, error) {
+       kwMutex.Lock()
+       defer kwMutex.Unlock()
+       //TODO: Generate kwid
+       kwid := "pkw" + strconv.Itoa(id)
+       id++
+       producer, err := NewKafkaWriter(kwc)
+       if err != nil {
+               log.Error("Error", err)
+               id--
+               return "", err
+       }
+
+       KWMap[kwid] = KWProducer{
+               Config:   *kwc,
+               Producer: producer,
+       }
+       return kwid, nil
+}
+
+//DeleteKWC - Method to add KafkaWriter request to KWMap
+func DeleteKWC(kwid string) {
+       kwMutex.Lock()
+       defer kwMutex.Unlock()
+       if _, ok := KWMap[kwid]; ok {
+               KWMap[kwid].Producer.Close()
+       }
+       delete(KWMap, kwid)
+}
+
+//ListKWC - Method to add KafkaWriter request to KWMap
+func ListKWC() KWRespMap {
+       kwr := newKWRespMap()
+       for k, v := range KWMap {
+               kwr[k] = v.Config
+       }
+       return kwr
+}
+
+//Cleanup - Method to cleanup resources
+func Cleanup() {
+       for k := range KWMap {
+               DeleteKWC(k)
+       }
+}