3 * Copyright 2019 Intel Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
19 "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
24 //KWConfig - serialized type for config related to Kafka
25 type KWConfig struct {
26 //Broker - Kafka Bootstrap servers (comma separated)
27 Broker string `json:"bootstrap.servers"`
28 //Topic - kafka topic name
29 Topic string `json:"topic"`
30 //UsePartition - Enforce use of partitions
31 UsePartition bool `json:"usePartition"`
32 BatchMsgNum int `json:"batch.num.messages,omitempty"`
33 Compression string `json:"compression.codec,omitempty"`
36 //KWProducer - holds the Kafka Config and associated Kafka Producer
37 type KWProducer struct {
39 Producer *kafka.Producer
42 //KWRespMap packs the KWConfig and kwid for List Api
43 type KWRespMap map[string]KWConfig
45 //KWMap - Stores the Kafka Writer to Kafka Producer Mapping
46 // This is used to uniquely identify a Kafka Writer - Producer mapping.
48 KWMap = make(map[string]KWProducer)
53 // NewKafkaWriter - creates a new producer using kafka config.
54 // Handles the remote write from prometheus and send to kafka topic
55 func NewKafkaWriter(kwc *KWConfig) (*kafka.Producer, error) {
56 producer, err := kafka.NewProducer(&kafka.ConfigMap{
57 "bootstrap.servers": kwc.Broker,
58 "compression.codec": kwc.Compression,
59 "batch.num.messages": kwc.BatchMsgNum,
60 "go.batch.producer": true,
61 "go.delivery.reports": false,
69 //NewKWConfig - creates a KWConfig object with default values
70 func NewKWConfig() *KWConfig {
78 //NewKWRespMap - packs the KWConfig and kwid for List Api
79 func newKWRespMap() KWRespMap {
80 kwr := make(KWRespMap)
84 //AddKWC - Method to add KafkaWriterConfig request to KWMap
85 func AddKWC(kwc *KWConfig) (string, error) {
87 defer kwMutex.Unlock()
89 kwid := "pkw" + strconv.Itoa(id)
91 producer, err := NewKafkaWriter(kwc)
93 log.Error("Error", err)
98 KWMap[kwid] = KWProducer{
105 //DeleteKWC - Method to add KafkaWriter request to KWMap
106 func DeleteKWC(kwid string) {
108 defer kwMutex.Unlock()
109 if _, ok := KWMap[kwid]; ok {
110 KWMap[kwid].Producer.Close()
115 //ListKWC - Method to add KafkaWriter request to KWMap
116 func ListKWC() KWRespMap {
117 kwr := newKWRespMap()
118 for k, v := range KWMap {
124 //Cleanup - Method to cleanup resources
126 for k := range KWMap {