bd161aabb73dd5e4baa33b2ba96faaa56cc5251d
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / api / MessageRouterSubscriberIT.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import eu.rekawek.toxiproxy.Proxy;
26 import eu.rekawek.toxiproxy.ToxiproxyClient;
27 import io.vavr.collection.List;
28 import org.junit.jupiter.api.BeforeAll;
29 import org.junit.jupiter.api.Test;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
35 import org.testcontainers.containers.DockerComposeContainer;
36 import org.testcontainers.junit.jupiter.Container;
37 import org.testcontainers.junit.jupiter.Testcontainers;
38 import reactor.core.publisher.Flux;
39 import reactor.core.publisher.Mono;
40 import reactor.test.StepVerifier;
41
42 import java.io.IOException;
43 import java.time.Duration;
44 import java.util.concurrent.TimeUnit;
45
46 import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
47 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
48 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
49 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
50 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
55
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
60
61 @Testcontainers
62 class MessageRouterSubscriberIT {
63     private static final Duration TIMEOUT = Duration.ofSeconds(10);
64     private static final String CONSUMER_GROUP = "group1";
65     private static final String CONSUMER_ID = "consumer200";
66     private static String PROXY_EVENTS_PATH;
67     private static Proxy DMAAP_PROXY;
68     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
69             "{" +
70             "\"mrstatus\":3001," +
71             "\"helpURL\":\"http://onap.readthedocs.io\"," +
72             "\"message\":\"No such topic exists.-[%s]\"," +
73             "\"status\":404" +
74             "}";
75     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
76             + "{"
77             + "\"requestError\":"
78             + "{"
79             + "\"serviceException\":"
80             + "{"
81             + "\"messageId\":\"SVC0001\","
82             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
83             + "\"variables\":[\"408\"]"
84             + "}"
85             + "}"
86             + "}";
87
88     @Container
89     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
90
91     private static String EVENTS_PATH;
92
93     private MessageRouterPublisher publisher = DmaapClientFactory
94             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
95     private MessageRouterSubscriber subscriber = DmaapClientFactory
96             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
97
98
99     @BeforeAll
100     static void setUp() throws IOException {
101         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
102         PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
103
104         DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
105                 String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
106                 String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
107     }
108
109
110     @Test
111     void subscriber_shouldHandleNoSuchTopicException() {
112         //given
113         final String topic = "newTopic";
114         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
115                 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
116         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
117                 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
118
119         //when
120         Mono<MessageRouterSubscribeResponse> response = subscriber
121                 .get(mrSubscribeRequest);
122
123         //then
124         StepVerifier.create(response)
125                 .expectNext(expectedResponse)
126                 .expectComplete()
127                 .verify(TIMEOUT);
128     }
129
130     @Test
131     void subscriberShouldHandleSingleItemResponse(){
132         //given
133         final String topic = "TOPIC";
134         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
135         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
136         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
137
138         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
139         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
140         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
141         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
142
143         //when
144         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
145         Mono<MessageRouterSubscribeResponse> response = publisher
146                 .put(publishRequest, jsonMessageBatch)
147                 .then(subscriber.get(subscribeRequest));
148
149         //then
150         StepVerifier.create(response)
151                 .expectNext(expectedResponse)
152                 .expectComplete()
153                 .verify();
154     }
155
156     @Test
157     void subscriber_shouldHandleMultipleItemsResponse() {
158         //given
159         final String topic = "TOPIC2";
160         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
161         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
162         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
163
164         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
165                 "{\"differentMessage\":\"message2\"}");
166         final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
167         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
168         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
169
170         //when
171         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
172         Mono<MessageRouterSubscribeResponse> response = publisher
173                 .put(publishRequest, jsonMessageBatch)
174                 .then(subscriber.get(subscribeRequest));
175
176         //then
177         StepVerifier.create(response)
178                 .expectNext(expectedResponse)
179                 .expectComplete()
180                 .verify();
181     }
182
183     @Test
184     void subscriber_shouldExtractItemsFromResponse() {
185         //given
186         final String topic = "TOPIC3";
187         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
188         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
189         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
190                 CONSUMER_GROUP, CONSUMER_ID);
191
192         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
193                 "{\"differentMessage\":\"message2\"}");
194         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
195
196         //when
197         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
198         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
199                 .thenMany(subscriber.getElements(subscribeRequest));
200
201         //then
202         StepVerifier.create(result)
203                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
204                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
205                 .expectComplete()
206                 .verify(TIMEOUT);
207     }
208
209     @Test
210     void subscriber_shouldSubscribeToTopic(){
211         //given
212         final String topic = "TOPIC4";
213         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
214         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
215         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
216                 CONSUMER_GROUP, CONSUMER_ID);
217
218         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
219                 "{\"differentMessage\":\"message2\"}");
220         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
221         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
222
223         //when
224         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
225         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
226                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
227
228         //then
229         StepVerifier.create(result.take(2))
230                 .expectNext(messages.get(0))
231                 .expectNext(messages.get(1))
232                 .expectComplete()
233                 .verify(TIMEOUT);
234     }
235
236     @Test
237     void subscriber_shouldHandleTimeoutException() throws IOException {
238         //given
239         final String topic = "newTopic";
240         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
241                 String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
242         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
243                 TIMEOUT_ERROR_MESSAGE);
244
245         final String toxic = "latency-toxic";
246         DMAAP_PROXY.toxics()
247                 .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
248
249         //when
250         Mono<MessageRouterSubscribeResponse> response = subscriber
251                 .get(mrSubscribeRequest);
252
253         //then
254         StepVerifier.create(response)
255                 .expectNext(expectedResponse)
256                 .expectComplete()
257                 .verify(TIMEOUT);
258
259         //cleanup
260         DMAAP_PROXY.toxics().get(toxic).remove();
261     }
262 }