3 * Copyright 2019 Intel Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
20 "github.com/prometheus/common/model"
21 "github.com/prometheus/prometheus/prompb"
22 "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
23 logger "prom-kafka-writer/pkg/config"
26 var log = logger.GetLoggerInstance()
28 func PublishTimeSeries(kwid string, metrics *prompb.WriteRequest) error {
29 log.Debugw("Remote write Time Series", "length", len(metrics.Timeseries), "TimeSeries", metrics.Timeseries)
30 for _, ts := range metrics.Timeseries {
31 m := make(model.Metric, len(ts.Labels))
32 for _, l := range ts.Labels {
33 m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
35 log.Debugw("Labels", "Labelset", m)
37 for _, s := range ts.Samples {
38 log.Debugf(" %f %d\n", s.Value, s.Timestamp)
39 metric := map[string]interface{}{
40 "name": m["__name__"],
42 "timestamp": s.Timestamp,
45 key := string(m["__name__"])
46 jsonMetric, err := json.Marshal(metric)
48 log.Errorw("Marshal error", "error", err.Error())
51 err = publish(kwid, key, jsonMetric)
53 log.Error("Failed to produce message")
61 func publish(kwid string, key string, jsonMetric []byte) error {
63 kwp = KWMap[kwid].Producer
64 kwc = KWMap[kwid].Config
67 tp := getTopicPartition(kwc)
68 kwMsg := kafka.Message{TopicPartition: tp, Key: []byte(key), Value: jsonMetric}
69 err := kwp.Produce(&kwMsg, nil)
71 log.Errorw("Kafka Producer Error", "error", err.Error())
76 func getTopicPartition(kwc KWConfig) kafka.TopicPartition {
77 p := kafka.PartitionAny
79 // TODO: Implement partition strategy
80 p = kafka.PartitionAny
82 return kafka.TopicPartition{Topic: &kwc.Topic, Partition: p}