1268a16a15e88b8420992676b325b99ed07bb688
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / api / MessageRouterPublisherTest.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
24 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
25
26 import com.google.gson.JsonElement;
27 import com.google.gson.JsonPrimitive;
28 import io.vavr.collection.List;
29
30 import java.time.Duration;
31
32 import org.junit.jupiter.api.BeforeAll;
33 import org.junit.jupiter.api.Test;
34 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
35 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
43 import reactor.core.publisher.Flux;
44 import reactor.core.publisher.Mono;
45 import reactor.test.StepVerifier;
46
47 /**
48  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
49  * @since May 2019
50  */
51 class MessageRouterPublisherTest {
52
53     private static final String ERROR_MESSAGE = "Something went wrong";
54     private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
55     private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
56     private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401";
57     private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
58     private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
59     private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500";
60     private static final Duration TIMEOUT = Duration.ofSeconds(10);
61     private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
62             .map(JsonPrimitive::new);
63     private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
64
65     private static DummyHttpServer server;
66     private MessageRouterPublisher sut = DmaapClientFactory
67             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
68
69
70     @BeforeAll
71     static void setUp() {
72         server = DummyHttpServer.start(routes ->
73                 routes.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
74                         .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
75                                 sendError(resp, 400, ERROR_MESSAGE))
76                         .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
77                                 sendError(resp, 401, ERROR_MESSAGE))
78                         .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
79                                 sendError(resp, 403, ERROR_MESSAGE))
80                         .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
81                                 sendError(resp, 404, ERROR_MESSAGE))
82                         .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
83                                 sendError(resp, 500, ERROR_MESSAGE))
84         );
85     }
86
87     @Test
88     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
89         //given
90         final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH,
91                 ContentType.TEXT_PLAIN);
92         final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
93
94
95         //when
96         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
97
98         //then
99         StepVerifier.create(result)
100                 .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
101                 .expectComplete()
102                 .verify(TIMEOUT);
103     }
104
105     @Test
106     void publisher_shouldHandleBadRequestError() {
107         //given
108         final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH,
109                 ContentType.TEXT_PLAIN);
110         final MessageRouterPublishResponse expectedResponse = createErrorResponse(
111                 "400 Bad Request\n%s", ERROR_MESSAGE);
112
113         //when
114         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
115
116         //then
117         StepVerifier.create(result)
118                 .expectNext(expectedResponse)
119                 .expectComplete()
120                 .verify(TIMEOUT);
121     }
122
123     @Test
124     void publisher_shouldHandleUnauthorizedError() {
125         //given
126         final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH,
127                 ContentType.TEXT_PLAIN);
128         final MessageRouterPublishResponse expectedResponse = createErrorResponse(
129                 "401 Unauthorized\n%s", ERROR_MESSAGE);
130
131         //when
132         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
133
134         //then
135         StepVerifier.create(result)
136                 .expectNext(expectedResponse)
137                 .expectComplete()
138                 .verify(TIMEOUT);
139     }
140
141     @Test
142     void publisher_shouldHandleForbiddenError() {
143         //given
144         final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH,
145                 ContentType.TEXT_PLAIN);
146         final MessageRouterPublishResponse expectedResponse = createErrorResponse(
147                 "403 Forbidden\n%s", ERROR_MESSAGE);
148
149         //when
150         final Flux<MessageRouterPublishResponse> result = sut
151                 .put(mrRequest, messageBatch);
152
153         //then
154         StepVerifier.create(result)
155                 .expectNext(expectedResponse)
156                 .expectComplete()
157                 .verify(TIMEOUT);
158     }
159
160     @Test
161     void publisher_shouldHandleNotFoundError() {
162         //given
163         final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH,
164                 ContentType.TEXT_PLAIN);
165         final MessageRouterPublishResponse expectedResponse = createErrorResponse(
166                 "404 Not Found\n%s", ERROR_MESSAGE);
167
168         //when
169         final Flux<MessageRouterPublishResponse> result = sut
170                 .put(mrRequest, messageBatch);
171
172         //then
173         StepVerifier.create(result)
174                 .expectNext(expectedResponse)
175                 .expectComplete()
176                 .verify(TIMEOUT);
177     }
178
179     @Test
180     void publisher_shouldHandleInternalServerError() {
181         //given
182         final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH,
183                 ContentType.TEXT_PLAIN);
184         final MessageRouterPublishResponse expectedResponse = createErrorResponse(
185                 "500 Internal Server Error\n%s", ERROR_MESSAGE);
186
187         //when
188         final Flux<MessageRouterPublishResponse> result = sut
189                 .put(mrRequest, messageBatch);
190
191         //then
192         StepVerifier.create(result)
193                 .expectNext(expectedResponse)
194                 .expectComplete()
195                 .verify(TIMEOUT);
196     }
197
198
199     private MessageRouterPublishRequest createMRRequest(String topicPath, ContentType contentType) {
200         final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
201                 .name("the topic")
202                 .topicUrl(String.format("http://%s:%d%s",
203                         server.host(),
204                         server.port(),
205                         topicPath)
206                 )
207                 .build();
208
209         return ImmutableMessageRouterPublishRequest.builder()
210                 .sinkDefinition(sinkDefinition)
211                 .contentType(contentType)
212                 .build();
213     }
214
215     private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs) {
216         return ImmutableMessageRouterPublishResponse
217                 .builder()
218                 .failReason(String.format(failReasonFormat, formatArgs))
219                 .build();
220     }
221 }
222