2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.Test;
28 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
29 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
35 import org.testcontainers.containers.DockerComposeContainer;
36 import org.testcontainers.junit.jupiter.Container;
37 import org.testcontainers.junit.jupiter.Testcontainers;
38 import reactor.core.publisher.Flux;
39 import reactor.core.publisher.Mono;
40 import reactor.test.StepVerifier;
42 import java.time.Duration;
44 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
45 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
46 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
47 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
48 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
49 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
50 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
55 class MessageRouterPublisherIT {
57 private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
58 private static final Duration TIMEOUT = Duration.ofSeconds(10);
59 private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
61 + "\"mrstatus\":5007,"
62 + "\"helpURL\":\"http://onap.readthedocs.io\","
63 + "\"message\":\"Error while publishing data to topic.:%s."
64 + "Successfully published number of messages :0."
65 + "Expected { to start an object.\",\"status\":400"
67 private static String EVENTS_PATH;
68 private final MessageRouterPublisher publisher = DmaapClientFactory
69 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
70 private MessageRouterSubscriber subscriber = DmaapClientFactory
71 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
75 EVENTS_PATH = String.format("http://%s:%d/events",
76 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
77 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
78 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
79 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
83 void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
85 final String topic = "TOPIC";
86 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
87 "{\"differentMessage\":\"message2\"}");
88 final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
89 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
90 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
93 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
96 StepVerifier.create(result)
97 .expectNext(expectedResponse)
103 void publisher_shouldHandleBadRequestError(){
105 final String topic = "TOPIC2";
106 final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
107 final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
108 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
109 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
110 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
113 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
116 StepVerifier.create(result)
117 .expectNext(expectedResponse)
123 void publisher_shouldSuccessfullyPublishSingleMessage(){
125 final String topic = "TOPIC3";
126 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
127 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
128 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
129 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
130 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
131 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
132 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
135 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
136 Mono<MessageRouterSubscribeResponse> response = publisher
137 .put(publishRequest, jsonMessageBatch)
138 .then(subscriber.get(subscribeRequest));
141 StepVerifier.create(response)
142 .expectNext(expectedResponse)
148 void publisher_shouldSuccessfullyPublishMultipleMessages(){
149 final String topic = "TOPIC5";
150 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
151 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
152 "{\"differentMessage\":\"message2\"}");
153 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
154 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
155 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
156 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
157 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
160 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
161 Mono<MessageRouterSubscribeResponse> response = publisher
162 .put(publishRequest, jsonMessageBatch)
163 .then(subscriber.get(subscribeRequest));
166 StepVerifier.create(response)
167 .expectNext(expectedResponse)
173 void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){
175 final String topic = "TOPIC6";
176 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
178 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
179 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
180 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
182 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
183 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
184 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
187 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
188 Mono<MessageRouterSubscribeResponse> response = publisher
189 .put(publishRequest, plainBatch)
190 .then(subscriber.get(subscribeRequest));
193 StepVerifier.create(response)
194 .expectNext(expectedResponse)
200 void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){
202 final String topic = "TOPIC7";
203 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
205 final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
206 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
207 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
209 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
210 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
211 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
214 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
215 Mono<MessageRouterSubscribeResponse> response = publisher
216 .put(publishRequest, plainBatch)
217 .then(subscriber.get(subscribeRequest));
220 StepVerifier.create(response)
221 .expectNext(expectedResponse)
227 void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){
229 final String topic = "TOPIC8";
230 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
232 final List<String> singlePlainMessage = List.of("kebab");
233 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
234 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
236 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
237 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
238 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
241 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
242 Mono<MessageRouterSubscribeResponse> response = publisher
243 .put(publishRequest, plainBatch)
244 .then(subscriber.get(subscribeRequest));
247 StepVerifier.create(response)
248 .expectNext(expectedResponse)
254 void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){
256 final String topic = "TOPIC9";
257 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
259 final List<String> singlePlainMessage = List.of("I", "like", "pizza");
260 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
261 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
263 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
264 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
265 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
268 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
269 Mono<MessageRouterSubscribeResponse> response = publisher
270 .put(publishRequest, plainBatch)
271 .then(subscriber.get(subscribeRequest));
274 StepVerifier.create(response)
275 .expectNext(expectedResponse)