3 * Copyright 2019 Intel Corporation.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
23 "github.com/golang/protobuf/proto"
24 "github.com/golang/snappy"
25 "github.com/prometheus/prometheus/prompb"
26 "github.com/stretchr/testify/assert"
30 "prom-kafka-writer/pkg/kafkawriter"
36 func (errReader) Read(p []byte) (n int, err error) {
37 return 0, errors.New("test error")
40 func TestCreateKWHandler(t *testing.T) {
45 expectResp *kwResponse
48 name: "Test Create Kafka Writer",
49 body: bytes.NewBuffer([]byte(`{
50 "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
52 "usePartition": false,
53 "compression.codec": "snappy"
55 expectStatus: http.StatusCreated,
56 expectResp: &kwResponse{
61 name: "Test Create Kafka Writer Wrong parameters",
62 body: bytes.NewBuffer([]byte(`{
63 "servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
64 "kafkatopic": "adatopic1",
65 "usePartition": false,
66 "compression.codec": "snappy"
68 expectStatus: http.StatusUnprocessableEntity,
69 expectResp: &kwResponse{},
72 name: "Test Create Kafka Writer Empty Body",
73 body: bytes.NewBuffer([]byte(nil)),
74 expectStatus: http.StatusBadRequest,
75 expectResp: &kwResponse{},
79 for _, tt := range tests {
80 t.Run(tt.name, func(t *testing.T) {
81 req := httptest.NewRequest("POST", "/pkw", tt.body)
82 rec := httptest.NewRecorder()
86 assert.Equal(t, tt.expectStatus, resp.StatusCode)
87 kwResp := &kwResponse{}
88 json.NewDecoder(resp.Body).Decode(&kwResp)
89 assert.Equal(t, tt.expectResp, kwResp)
94 func TestListKWHandler(t *testing.T) {
100 expectResp *kwResponse
103 name: "Test List Kafka Writers",
105 "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
106 "topic": "adatopic1",
107 "usePartition": false,
108 "batch.num.messages": 10000,
109 "compression.codec": "snappy"
111 expectStatus: http.StatusOK,
112 expectResp: &kwResponse{
113 KWCRespMap: map[string]kafkawriter.KWConfig{
115 Broker: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
119 Compression: "snappy",
125 for _, tt := range tests {
126 t.Run(tt.name, func(t *testing.T) {
127 preCreateKW("pkw0", tt.body)
128 req := httptest.NewRequest("GET", "/pkw", nil)
129 rec := httptest.NewRecorder()
131 r.ServeHTTP(rec, req)
133 assert.Equal(t, tt.expectStatus, resp.StatusCode)
134 kwResp := &kwResponse{}
135 json.NewDecoder(resp.Body).Decode(&kwResp)
136 assert.Equal(t, tt.expectResp, kwResp)
141 func TestDeleteKWHandler(t *testing.T) {
148 name: "Test Delete Kafka Writer",
150 expectStatus: http.StatusOK,
154 "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
155 "topic": "adatopic1",
156 "usePartition": false,
157 "batch.num.messages": 10000,
158 "compression.codec": "snappy"
160 for _, tt := range tests {
161 t.Run(tt.name, func(t *testing.T) {
162 preCreateKW(tt.kwid, body)
163 target := fmt.Sprintf("/pkw/%s", tt.kwid)
164 req := httptest.NewRequest("DELETE", target, nil)
166 rec := httptest.NewRecorder()
167 r.ServeHTTP(rec, req)
169 assert.Equal(t, tt.expectStatus, resp.StatusCode)
174 func preCreateKW(kwid string, body string) {
175 kafkawriter.Cleanup()
177 kwc := &kafkawriter.KWConfig{}
178 _ = json.Unmarshal(k, kwc)
179 producer, _ := kafkawriter.NewKafkaWriter(kwc)
180 kafkawriter.KWMap[kwid] = kafkawriter.KWProducer{Config: *kwc, Producer: producer}
183 func TestReceiveKWHandler(t *testing.T) {
184 f, err := buildRemoteWriteRequest()
186 t.Fatal("Could not build prompb.WriteRequest")
196 name: "Test Receive Messages Empty Message",
199 expectStatus: http.StatusBadRequest,
202 name: "Test Receive Messages",
205 body: bytes.NewReader(f),
206 expectStatus: http.StatusOK,
209 name: "Test Receive Messages Kafka Writer Not registed",
212 expectStatus: http.StatusNotFound,
215 name: "Test Receive Messages Kafka Writer Not registed",
219 expectStatus: http.StatusInternalServerError,
223 "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
224 "topic": "adatopic1",
225 "usePartition": false,
226 "batch.num.messages": 10000,
227 "compression.codec": "snappy"
229 for _, tt := range tests {
230 t.Run(tt.name, func(t *testing.T) {
232 preCreateKW(tt.kwid, body)
234 target := fmt.Sprintf("/pkw/%s/receive", tt.kwid)
235 req := httptest.NewRequest("POST", target, tt.body)
237 rec := httptest.NewRecorder()
238 r.ServeHTTP(rec, req)
240 assert.Equal(t, tt.expectStatus, resp.StatusCode)
245 func buildRemoteWriteRequest() ([]byte, error) {
247 samples := []*prompb.TimeSeries{
249 Labels: []*prompb.Label{
250 &prompb.Label{Name: "__name__", Value: "go_gc_duration_seconds_count"},
251 &prompb.Label{Name: "endpoint", Value: "http"},
252 &prompb.Label{Name: "instance", Value: "10.42.1.101:8686"},
253 &prompb.Label{Name: "job", Value: "prom-kafka-writer"},
254 &prompb.Label{Name: "metrics_storage", Value: "kafka_remote"},
255 &prompb.Label{Name: "namespace", Value: "edge1"},
256 &prompb.Label{Name: "pod", Value: "prom-kafka-writer-696898f47f-bc5fs"},
257 &prompb.Label{Name: "prometheus", Value: "edge1/cp-prometheus-prometheus"},
258 &prompb.Label{Name: "prometheus_replica", Value: "prometheus-cp-prometheus-prometheus-0"},
259 &prompb.Label{Name: "service", Value: "prom-kafka-writer"},
261 Samples: []prompb.Sample{
264 Timestamp: 1572479934007,
268 Timestamp: 1572480144007,
273 req := &prompb.WriteRequest{
277 data, err := proto.Marshal(req)
282 // snappy uses len() to see if it needs to allocate a new slice. Make the
283 // buffer as long as possible.
285 buf = buf[0:cap(buf)]
287 compressed := snappy.Encode(buf, data)
288 return compressed, nil