2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import eu.rekawek.toxiproxy.Proxy;
26 import eu.rekawek.toxiproxy.ToxiproxyClient;
27 import io.vavr.collection.List;
28 import org.junit.jupiter.api.BeforeAll;
29 import org.junit.jupiter.api.Test;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
35 import org.testcontainers.containers.DockerComposeContainer;
36 import org.testcontainers.junit.jupiter.Container;
37 import org.testcontainers.junit.jupiter.Testcontainers;
38 import reactor.core.publisher.Flux;
39 import reactor.core.publisher.Mono;
40 import reactor.test.StepVerifier;
42 import java.io.IOException;
43 import java.time.Duration;
44 import java.util.concurrent.TimeUnit;
46 import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
47 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
48 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
49 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
50 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
62 class MessageRouterSubscriberIT {
63 private static final Duration TIMEOUT = Duration.ofSeconds(10);
64 private static final String CONSUMER_GROUP = "group1";
65 private static final String CONSUMER_ID = "consumer200";
66 private static String PROXY_EVENTS_PATH;
67 private static Proxy DMAAP_PROXY;
68 private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
70 "\"mrstatus\":3001," +
71 "\"helpURL\":\"http://onap.readthedocs.io\"," +
72 "\"message\":\"No such topic exists.-[%s]\"," +
75 private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
79 + "\"serviceException\":"
81 + "\"messageId\":\"SVC0001\","
82 + "\"text\":\"Client timeout exception occurred, Error code is %1\","
83 + "\"variables\":[\"408\"]"
89 private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
91 private static String EVENTS_PATH;
93 private MessageRouterPublisher publisher = DmaapClientFactory
94 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
95 private MessageRouterSubscriber subscriber = DmaapClientFactory
96 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
100 static void setUp() throws IOException {
101 EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
102 PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
104 DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
105 String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
106 String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
111 void subscriber_shouldHandleNoSuchTopicException() {
113 final String topic = "newTopic";
114 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
115 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
116 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
117 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
120 Mono<MessageRouterSubscribeResponse> response = subscriber
121 .get(mrSubscribeRequest);
124 StepVerifier.create(response)
125 .expectNext(expectedResponse)
131 void subscriberShouldHandleSingleItemResponse(){
133 final String topic = "TOPIC";
134 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
135 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
136 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
138 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
139 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
140 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
141 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
144 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
145 Mono<MessageRouterSubscribeResponse> response = publisher
146 .put(publishRequest, jsonMessageBatch)
147 .then(subscriber.get(subscribeRequest));
150 StepVerifier.create(response)
151 .expectNext(expectedResponse)
157 void subscriber_shouldHandleMultipleItemsResponse() {
159 final String topic = "TOPIC2";
160 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
161 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
162 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
164 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
165 "{\"differentMessage\":\"message2\"}");
166 final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
167 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
168 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
171 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
172 Mono<MessageRouterSubscribeResponse> response = publisher
173 .put(publishRequest, jsonMessageBatch)
174 .then(subscriber.get(subscribeRequest));
177 StepVerifier.create(response)
178 .expectNext(expectedResponse)
184 void subscriber_shouldExtractItemsFromResponse() {
186 final String topic = "TOPIC3";
187 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
188 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
189 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
190 CONSUMER_GROUP, CONSUMER_ID);
192 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
193 "{\"differentMessage\":\"message2\"}");
194 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
197 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
198 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
199 .thenMany(subscriber.getElements(subscribeRequest));
202 StepVerifier.create(result)
203 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
204 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
210 void subscriber_shouldSubscribeToTopic(){
212 final String topic = "TOPIC4";
213 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
214 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
215 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
216 CONSUMER_GROUP, CONSUMER_ID);
218 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
219 "{\"differentMessage\":\"message2\"}");
220 final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
221 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
224 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
225 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
226 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
229 StepVerifier.create(result.take(2))
230 .expectNext(messages.get(0))
231 .expectNext(messages.get(1))
237 void subscriber_shouldHandleTimeoutException() throws IOException {
239 final String topic = "newTopic";
240 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
241 String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
242 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
243 TIMEOUT_ERROR_MESSAGE);
245 final String toxic = "latency-toxic";
247 .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
250 Mono<MessageRouterSubscribeResponse> response = subscriber
251 .get(mrSubscribeRequest);
254 StepVerifier.create(response)
255 .expectNext(expectedResponse)
260 DMAAP_PROXY.toxics().get(toxic).remove();