Prometheus Kafka Writer Microservice
[demo.git] / vnfs / DAaaS / microservices / prom-kafka-writer / pkg / api / handler_test.go
1 /*
2  *
3  * Copyright 2019 Intel Corporation.
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  *
14  */
15
16 package api
17
18 import (
19         "bytes"
20         "encoding/json"
21         "errors"
22         "fmt"
23         "github.com/golang/protobuf/proto"
24         "github.com/golang/snappy"
25         "github.com/prometheus/prometheus/prompb"
26         "github.com/stretchr/testify/assert"
27         "io"
28         "net/http"
29         "net/http/httptest"
30         "prom-kafka-writer/pkg/kafkawriter"
31         "testing"
32 )
33
34 type errReader int
35
36 func (errReader) Read(p []byte) (n int, err error) {
37         return 0, errors.New("test error")
38 }
39
40 func TestCreateKWHandler(t *testing.T) {
41         tests := []struct {
42                 name         string
43                 body         io.Reader
44                 expectStatus int
45                 expectResp   *kwResponse
46         }{
47                 {
48                         name: "Test Create Kafka Writer",
49                         body: bytes.NewBuffer([]byte(`{
50                                   "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
51                                   "topic": "adatopic1",
52                                   "usePartition": false,
53                                   "compression.codec": "snappy"
54                                   }`)),
55                         expectStatus: http.StatusCreated,
56                         expectResp: &kwResponse{
57                                 KWid: "pkw0",
58                         },
59                 },
60                 {
61                         name: "Test Create Kafka Writer Wrong parameters",
62                         body: bytes.NewBuffer([]byte(`{
63                                   "servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
64                                   "kafkatopic": "adatopic1",
65                                   "usePartition": false,
66                                   "compression.codec": "snappy"
67                                   }`)),
68                         expectStatus: http.StatusUnprocessableEntity,
69                         expectResp:   &kwResponse{},
70                 },
71                 {
72                         name:         "Test Create Kafka Writer Empty Body",
73                         body:         bytes.NewBuffer([]byte(nil)),
74                         expectStatus: http.StatusBadRequest,
75                         expectResp:   &kwResponse{},
76                 },
77         }
78
79         for _, tt := range tests {
80                 t.Run(tt.name, func(t *testing.T) {
81                         req := httptest.NewRequest("POST", "/pkw", tt.body)
82                         rec := httptest.NewRecorder()
83                         r := NewRouter()
84                         r.ServeHTTP(rec, req)
85                         resp := rec.Result()
86                         assert.Equal(t, tt.expectStatus, resp.StatusCode)
87                         kwResp := &kwResponse{}
88                         json.NewDecoder(resp.Body).Decode(&kwResp)
89                         assert.Equal(t, tt.expectResp, kwResp)
90                 })
91         }
92 }
93
94 func TestListKWHandler(t *testing.T) {
95
96         tests := []struct {
97                 name         string
98                 body         string
99                 expectStatus int
100                 expectResp   *kwResponse
101         }{
102                 {
103                         name: "Test List Kafka Writers",
104                         body: `{
105                                 "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
106                                 "topic": "adatopic1",
107                                 "usePartition": false,
108                                 "batch.num.messages": 10000,
109                                 "compression.codec": "snappy"
110                         }`,
111                         expectStatus: http.StatusOK,
112                         expectResp: &kwResponse{
113                                 KWCRespMap: map[string]kafkawriter.KWConfig{
114                                         "pkw0": {
115                                                 Broker:       "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
116                                                 Topic:        "adatopic1",
117                                                 UsePartition: false,
118                                                 BatchMsgNum:  10000,
119                                                 Compression:  "snappy",
120                                         },
121                                 },
122                         },
123                 },
124         }
125         for _, tt := range tests {
126                 t.Run(tt.name, func(t *testing.T) {
127                         preCreateKW("pkw0", tt.body)
128                         req := httptest.NewRequest("GET", "/pkw", nil)
129                         rec := httptest.NewRecorder()
130                         r := NewRouter()
131                         r.ServeHTTP(rec, req)
132                         resp := rec.Result()
133                         assert.Equal(t, tt.expectStatus, resp.StatusCode)
134                         kwResp := &kwResponse{}
135                         json.NewDecoder(resp.Body).Decode(&kwResp)
136                         assert.Equal(t, tt.expectResp, kwResp)
137                 })
138         }
139 }
140
141 func TestDeleteKWHandler(t *testing.T) {
142         tests := []struct {
143                 name         string
144                 kwid         string
145                 expectStatus int
146         }{
147                 {
148                         name:         "Test Delete Kafka Writer",
149                         kwid:         "pkw777",
150                         expectStatus: http.StatusOK,
151                 },
152         }
153         body := `{
154                                 "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
155                                 "topic": "adatopic1",
156                                 "usePartition": false,
157                                 "batch.num.messages": 10000,
158                                 "compression.codec": "snappy"
159                         }`
160         for _, tt := range tests {
161                 t.Run(tt.name, func(t *testing.T) {
162                         preCreateKW(tt.kwid, body)
163                         target := fmt.Sprintf("/pkw/%s", tt.kwid)
164                         req := httptest.NewRequest("DELETE", target, nil)
165                         r := NewRouter()
166                         rec := httptest.NewRecorder()
167                         r.ServeHTTP(rec, req)
168                         resp := rec.Result()
169                         assert.Equal(t, tt.expectStatus, resp.StatusCode)
170                 })
171         }
172 }
173
174 func preCreateKW(kwid string, body string) {
175         kafkawriter.Cleanup()
176         k := []byte(body)
177         kwc := &kafkawriter.KWConfig{}
178         _ = json.Unmarshal(k, kwc)
179         producer, _ := kafkawriter.NewKafkaWriter(kwc)
180         kafkawriter.KWMap[kwid] = kafkawriter.KWProducer{Config: *kwc, Producer: producer}
181 }
182
183 func TestReceiveKWHandler(t *testing.T) {
184         f, err := buildRemoteWriteRequest()
185         if err != nil {
186                 t.Fatal("Could not build prompb.WriteRequest")
187         }
188         tests := []struct {
189                 name         string
190                 kwid         string
191                 body         io.Reader
192                 preCreate    bool
193                 expectStatus int
194         }{
195                 {
196                         name:         "Test Receive Messages Empty Message",
197                         kwid:         "pkw111",
198                         preCreate:    true,
199                         expectStatus: http.StatusBadRequest,
200                 },
201                 {
202                         name:         "Test Receive Messages",
203                         kwid:         "pkw111",
204                         preCreate:    true,
205                         body:         bytes.NewReader(f),
206                         expectStatus: http.StatusOK,
207                 },
208                 {
209                         name:         "Test Receive Messages Kafka Writer Not registed",
210                         kwid:         "pkw222",
211                         preCreate:    false,
212                         expectStatus: http.StatusNotFound,
213                 },
214                 {
215                         name:         "Test Receive Messages Kafka Writer Not registed",
216                         kwid:         "pkw111",
217                         preCreate:    true,
218                         body:         errReader(0),
219                         expectStatus: http.StatusInternalServerError,
220                 },
221         }
222         body := `{
223                                 "bootstrap.servers": "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
224                                 "topic": "adatopic1",
225                                 "usePartition": false,
226                                 "batch.num.messages": 10000,
227                                 "compression.codec": "snappy"
228                         }`
229         for _, tt := range tests {
230                 t.Run(tt.name, func(t *testing.T) {
231                         if tt.preCreate {
232                                 preCreateKW(tt.kwid, body)
233                         }
234                         target := fmt.Sprintf("/pkw/%s/receive", tt.kwid)
235                         req := httptest.NewRequest("POST", target, tt.body)
236                         r := NewRouter()
237                         rec := httptest.NewRecorder()
238                         r.ServeHTTP(rec, req)
239                         resp := rec.Result()
240                         assert.Equal(t, tt.expectStatus, resp.StatusCode)
241                 })
242         }
243 }
244
245 func buildRemoteWriteRequest() ([]byte, error) {
246         var buf []byte
247         samples := []*prompb.TimeSeries{
248                 &prompb.TimeSeries{
249                         Labels: []*prompb.Label{
250                                 &prompb.Label{Name: "__name__", Value: "go_gc_duration_seconds_count"},
251                                 &prompb.Label{Name: "endpoint", Value: "http"},
252                                 &prompb.Label{Name: "instance", Value: "10.42.1.101:8686"},
253                                 &prompb.Label{Name: "job", Value: "prom-kafka-writer"},
254                                 &prompb.Label{Name: "metrics_storage", Value: "kafka_remote"},
255                                 &prompb.Label{Name: "namespace", Value: "edge1"},
256                                 &prompb.Label{Name: "pod", Value: "prom-kafka-writer-696898f47f-bc5fs"},
257                                 &prompb.Label{Name: "prometheus", Value: "edge1/cp-prometheus-prometheus"},
258                                 &prompb.Label{Name: "prometheus_replica", Value: "prometheus-cp-prometheus-prometheus-0"},
259                                 &prompb.Label{Name: "service", Value: "prom-kafka-writer"},
260                         },
261                         Samples: []prompb.Sample{
262                                 prompb.Sample{
263                                         Value:     17,
264                                         Timestamp: 1572479934007,
265                                 },
266                                 prompb.Sample{
267                                         Value:     19,
268                                         Timestamp: 1572480144007,
269                                 },
270                         },
271                 },
272         }
273         req := &prompb.WriteRequest{
274                 Timeseries: samples,
275         }
276
277         data, err := proto.Marshal(req)
278         if err != nil {
279                 return nil, err
280         }
281
282         // snappy uses len() to see if it needs to allocate a new slice. Make the
283         // buffer as long as possible.
284         if buf != nil {
285                 buf = buf[0:cap(buf)]
286         }
287         compressed := snappy.Encode(buf, data)
288         return compressed, nil
289 }