2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2020 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import eu.rekawek.toxiproxy.Proxy;
26 import eu.rekawek.toxiproxy.ToxiproxyClient;
27 import io.vavr.collection.List;
28 import org.junit.jupiter.api.BeforeAll;
29 import org.junit.jupiter.api.Test;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
37 import org.testcontainers.containers.DockerComposeContainer;
38 import org.testcontainers.junit.jupiter.Container;
39 import org.testcontainers.junit.jupiter.Testcontainers;
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
42 import reactor.test.StepVerifier;
44 import java.io.IOException;
45 import java.time.Duration;
46 import java.util.concurrent.TimeUnit;
48 import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
49 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
50 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
64 class MessageRouterPublisherIT {
66 private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
67 private static final Duration TIMEOUT = Duration.ofSeconds(10);
68 private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
70 + "\"mrstatus\":5007,"
71 + "\"helpURL\":\"http://onap.readthedocs.io\","
72 + "\"message\":\"Error while publishing data to topic.:%s."
73 + "Successfully published number of messages :0."
74 + "Expected { to start an object.\",\"status\":400"
76 private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
80 + "\"serviceException\":"
82 + "\"messageId\":\"SVC0001\","
83 + "\"text\":\"Client timeout exception occurred, Error code is %1\","
84 + "\"variables\":[\"408\"]"
88 private static Proxy DMAAP_PROXY;
89 private static String EVENTS_PATH;
90 private static String PROXY_EVENTS_PATH;
91 private final MessageRouterPublisher publisher = DmaapClientFactory
92 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
93 private final MessageRouterSubscriber subscriber = DmaapClientFactory
94 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
97 static void setUp() throws IOException {
98 EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
99 PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
101 DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
102 String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
103 String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
107 void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
109 final String topic = "TOPIC";
110 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
111 "{\"differentMessage\":\"message2\"}");
112 final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
113 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
114 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
117 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
120 StepVerifier.create(result)
121 .expectNext(expectedResponse)
127 void publisher_shouldHandleBadRequestError() {
129 final String topic = "TOPIC2";
130 final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
131 final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
132 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
133 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
134 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
137 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
140 StepVerifier.create(result)
141 .expectNext(expectedResponse)
147 void publisher_shouldSuccessfullyPublishSingleMessage() {
149 final String topic = "TOPIC3";
150 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
151 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
152 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
153 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
154 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
155 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
156 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
159 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
160 Mono<MessageRouterSubscribeResponse> response = publisher
161 .put(publishRequest, jsonMessageBatch)
162 .then(subscriber.get(subscribeRequest));
165 StepVerifier.create(response)
166 .expectNext(expectedResponse)
172 void publisher_shouldSuccessfullyPublishMultipleMessages() {
173 final String topic = "TOPIC5";
174 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
175 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
176 "{\"differentMessage\":\"message2\"}");
177 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
178 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
179 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
180 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
181 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
184 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
185 Mono<MessageRouterSubscribeResponse> response = publisher
186 .put(publishRequest, jsonMessageBatch)
187 .then(subscriber.get(subscribeRequest));
190 StepVerifier.create(response)
191 .expectNext(expectedResponse)
197 void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
199 final String topic = "TOPIC6";
200 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
202 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
203 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
204 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
206 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
207 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
208 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
211 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
212 Mono<MessageRouterSubscribeResponse> response = publisher
213 .put(publishRequest, plainBatch)
214 .then(subscriber.get(subscribeRequest));
217 StepVerifier.create(response)
218 .expectNext(expectedResponse)
224 void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
226 final String topic = "TOPIC7";
227 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
229 final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
230 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
231 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
233 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
234 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
235 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
238 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
239 Mono<MessageRouterSubscribeResponse> response = publisher
240 .put(publishRequest, plainBatch)
241 .then(subscriber.get(subscribeRequest));
244 StepVerifier.create(response)
245 .expectNext(expectedResponse)
251 void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
253 final String topic = "TOPIC8";
254 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
256 final List<String> singlePlainMessage = List.of("kebab");
257 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
258 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
260 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
261 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
262 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
265 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
266 Mono<MessageRouterSubscribeResponse> response = publisher
267 .put(publishRequest, plainBatch)
268 .then(subscriber.get(subscribeRequest));
271 StepVerifier.create(response)
272 .expectNext(expectedResponse)
278 void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
280 final String topic = "TOPIC9";
281 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
283 final List<String> singlePlainMessage = List.of("I", "like", "pizza");
284 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
285 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
287 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
288 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
289 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
292 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
293 Mono<MessageRouterSubscribeResponse> response = publisher
294 .put(publishRequest, plainBatch)
295 .then(subscriber.get(subscribeRequest));
298 StepVerifier.create(response)
299 .expectNext(expectedResponse)
305 void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException {
307 final String toxic = "latency-toxic";
309 .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
310 final String topic = "TOPIC10";
311 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
312 final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
313 final MessageRouterPublishRequest mrRequest = createPublishRequest(
314 String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1));
315 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
318 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
321 StepVerifier.create(result)
322 .expectNext(expectedResponse)
327 DMAAP_PROXY.toxics().get(toxic).remove();