225d35391b5b1918fddb370627792de9451a8a2b
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
24 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
25
26 import com.google.gson.JsonElement;
27 import com.google.gson.JsonPrimitive;
28 import io.vavr.collection.List;
29 import java.time.Duration;
30 import org.junit.jupiter.api.BeforeAll;
31 import org.junit.jupiter.api.Test;
32 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
33 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
42 import reactor.netty.http.server.HttpServerRoutes;
43 import reactor.test.StepVerifier;
44
45 /**
46  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
47  * @since May 2019
48  */
49 class MessageRouterSubscriberIT {
50     private static final Duration TIMEOUT = Duration.ofSeconds(10);
51     private static final String ERROR_MESSAGE = "Something went wrong";
52     private static final String CONSUMER_GROUP = "group1";
53     private static final String SUCCESS_CONSUMER_ID = "consumer200";
54     private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
55     private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
56     private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
57     private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
58     private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
59
60     private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
61
62     private static final String SUCCESS_RESP_PATH = String
63             .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
64     private static final String FAILING_WITH_401_RESP_PATH = String
65             .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
66     private static final String FAILING_WITH_403_RESP_PATH = String
67             .format("%s/%s", CONSUMER_PATH, FAILING_WITH_403_CONSUMER_ID);
68     private static final String FAILING_WITH_409_RESP_PATH = String
69             .format("%s/%s", CONSUMER_PATH, FAILING_WITH_409_CONSUMER_ID);
70     private static final String FAILING_WITH_429_RESP_PATH = String
71             .format("%s/%s", CONSUMER_PATH, FAILING_WITH_429_CONSUMER_ID);
72     private static final String FAILING_WITH_500_RESP_PATH = String
73             .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
74
75     private static MessageRouterSubscribeRequest mrSuccessRequest;
76     private static MessageRouterSubscribeRequest mrFailingRequest;
77     private MessageRouterSubscriber sut = DmaapClientFactory
78             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
79     private static MessageRouterSource sourceDefinition;
80
81
82     @BeforeAll
83     static void setUp() {
84         DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberIT::setRoutes);
85
86         sourceDefinition = createMessageRouterSource(server);
87
88         mrSuccessRequest = createSuccessRequest();
89
90         mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
91     }
92
93     @Test
94     void subscriber_shouldGetCorrectResponse(){
95         Mono<MessageRouterSubscribeResponse> response = sut
96                 .get(mrSuccessRequest);
97
98         List<String> expectedItems = List.of("I", "like", "pizza");
99
100         MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
101                 .builder()
102                 .items(expectedItems.map(JsonPrimitive::new))
103                 .build();
104
105         StepVerifier.create(response)
106                 .expectNext(expectedResponse)
107                 .expectComplete()
108                 .verify(TIMEOUT);
109     }
110
111     @Test
112     void subscriber_shouldGetUnauthorizedErrorResponse(){
113         MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
114         Mono<MessageRouterSubscribeResponse> response = sut.get(request);
115
116         MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
117                 .format("401 Unauthorized\n%s", ERROR_MESSAGE));
118
119         StepVerifier.create(response)
120                 .expectNext(expectedResponse)
121                 .expectComplete()
122                 .verify(TIMEOUT);
123     }
124
125     @Test
126     void subscriber_shouldGetForbiddenErrorResponse(){
127         MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
128         Mono<MessageRouterSubscribeResponse> response = sut.get(request);
129
130         MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
131                 .format("403 Forbidden\n%s", ERROR_MESSAGE));
132
133         StepVerifier.create(response)
134                 .expectNext(expectedResponse)
135                 .expectComplete()
136                 .verify(TIMEOUT);
137     }
138
139     @Test
140     void subscriber_shouldGetConflictErrorResponse(){
141         MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
142         Mono<MessageRouterSubscribeResponse> response = sut.get(request);
143
144         MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
145                 .format("409 Conflict\n%s", ERROR_MESSAGE));
146
147         StepVerifier.create(response)
148                 .expectNext(expectedResponse)
149                 .expectComplete()
150                 .verify(TIMEOUT);
151     }
152
153     @Test
154     void subscriber_shouldGetTooManyRequestsErrorResponse(){
155         MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
156         Mono<MessageRouterSubscribeResponse> response = sut.get(request);
157
158         MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
159                 .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
160
161         StepVerifier.create(response)
162                 .expectNext(expectedResponse)
163                 .expectComplete()
164                 .verify(TIMEOUT);
165     }
166
167     @Test
168     void subscriber_shouldGetInternalServerErrorResponse(){
169         Mono<MessageRouterSubscribeResponse> response = sut
170                 .get(mrFailingRequest);
171
172         MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
173                 .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
174
175         StepVerifier.create(response)
176                 .expectNext(expectedResponse)
177                 .expectComplete()
178                 .verify(TIMEOUT);
179     }
180
181     @Test
182     void subscriber_shouldParseCorrectResponse() {
183         final Flux<String> result = sut
184                 .getElements(mrSuccessRequest)
185                 .map(JsonElement::getAsString);
186
187         StepVerifier.create(result)
188                 .expectNext("I", "like", "pizza")
189                 .expectComplete()
190                 .verify(TIMEOUT);
191     }
192
193     @Test
194     void subscriber_shouldParseErrorResponse(){
195         Flux<String> result = sut
196                 .getElements(mrFailingRequest)
197                 .map(JsonElement::getAsString);
198
199         StepVerifier.create(result)
200                 .expectError(IllegalStateException.class)
201                 .verify(TIMEOUT);
202     }
203
204     @Test
205     void subscriber_shouldSubscribeCorrectly(){
206         Flux<String> subscriptionForElements = sut
207                 .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
208                 .map(JsonElement:: getAsString);
209
210         StepVerifier.create(subscriptionForElements.take(2))
211                 .expectNext("I", "like")
212                 .expectComplete()
213                 .verify(TIMEOUT);
214     }
215
216     @Test
217     void subscriber_shouldParseErrorWhenSubscribed(){
218         Flux<String> subscriptionForElements = sut
219                 .subscribeForElements(mrFailingRequest, Duration.ofSeconds(1))
220                 .map(JsonElement:: getAsString);
221
222         StepVerifier.create(subscriptionForElements.take(2))
223                 .expectError(IllegalStateException.class)
224                 .verify(TIMEOUT);
225     }
226
227     private static HttpServerRoutes setRoutes(HttpServerRoutes routes){
228         return routes
229                 .get(SUCCESS_RESP_PATH, (req, resp) ->
230                         sendResource(resp, "/sample-mr-subscribe-response.json"))
231                 .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
232                         sendError(resp, 401, ERROR_MESSAGE))
233                 .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
234                         sendError(resp, 403, ERROR_MESSAGE))
235                 .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
236                         sendError(resp, 409, ERROR_MESSAGE))
237                 .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
238                         sendError(resp, 429, ERROR_MESSAGE))
239                 .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
240                         sendError(resp, 500, ERROR_MESSAGE));
241     }
242
243     private static MessageRouterSource createMessageRouterSource(DummyHttpServer server){
244        return ImmutableMessageRouterSource.builder()
245                 .name("the topic")
246                 .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
247                 .build();
248     }
249
250     private static MessageRouterSubscribeRequest createSuccessRequest(){
251         return ImmutableMessageRouterSubscribeRequest.builder()
252                 .sourceDefinition(sourceDefinition)
253                 .consumerGroup(CONSUMER_GROUP)
254                 .consumerId(SUCCESS_CONSUMER_ID)
255                 .build();
256     }
257
258     private static MessageRouterSubscribeRequest createFailingRequest(String consumerId){
259         return ImmutableMessageRouterSubscribeRequest
260                 .builder()
261                 .sourceDefinition(sourceDefinition)
262                 .consumerGroup(CONSUMER_GROUP)
263                 .consumerId(consumerId)
264                 .build();
265     }
266
267     private static MessageRouterSubscribeResponse createErrorResponse(String failReason){
268         return ImmutableMessageRouterSubscribeResponse
269                 .builder()
270                 .failReason(failReason)
271                 .build();
272     }
273 }