2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import io.vavr.collection.List;
28 import java.time.Duration;
29 import org.junit.jupiter.api.BeforeAll;
30 import org.junit.jupiter.api.Test;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
38 import org.testcontainers.containers.DockerComposeContainer;
39 import org.testcontainers.junit.jupiter.Container;
40 import org.testcontainers.junit.jupiter.Testcontainers;
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
43 import reactor.test.StepVerifier;
46 class MessageRouterPublisherIT {
48 private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
49 private static final Duration TIMEOUT = Duration.ofSeconds(10);
50 private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
52 + "\"mrstatus\":5007,"
53 + "\"helpURL\":\"http://onap.readthedocs.io\","
54 + "\"message\":\"Error while publishing data to topic.:%s."
55 + "Successfully published number of messages :0."
56 + "Expected { to start an object.\",\"status\":400"
58 private static String EVENTS_PATH;
59 private final MessageRouterPublisher publisher = DmaapClientFactory
60 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
61 private MessageRouterSubscriber subscriber = DmaapClientFactory
62 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
66 EVENTS_PATH = String.format("http://%s:%d/events",
67 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
68 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
69 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
70 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
74 void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
76 final String topic = "TOPIC";
77 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
78 "{\"differentMessage\":\"message2\"}");
79 final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
80 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
81 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
84 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
87 StepVerifier.create(result)
88 .expectNext(expectedResponse)
94 void publisher_shouldHandleBadRequestError(){
96 final String topic = "TOPIC2";
97 final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
98 final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
99 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
100 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
101 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
104 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
107 StepVerifier.create(result)
108 .expectNext(expectedResponse)
114 void publisher_shouldSuccessfullyPublishSingleMessage(){
116 final String topic = "TOPIC3";
117 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
118 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
119 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
120 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
121 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
122 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
123 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
126 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
127 Mono<MessageRouterSubscribeResponse> response = publisher
128 .put(publishRequest, jsonMessageBatch)
129 .then(subscriber.get(subscribeRequest));
132 StepVerifier.create(response)
133 .expectNext(expectedResponse)
139 void publisher_shouldSuccessfullyPublishMultipleMessages(){
140 final String topic = "TOPIC5";
141 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
142 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
143 "{\"differentMessage\":\"message2\"}");
144 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
145 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
146 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
147 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
148 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
151 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
152 Mono<MessageRouterSubscribeResponse> response = publisher
153 .put(publishRequest, jsonMessageBatch)
154 .then(subscriber.get(subscribeRequest));
157 StepVerifier.create(response)
158 .expectNext(expectedResponse)
164 void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){
166 final String topic = "TOPIC6";
167 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
169 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
170 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
171 final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
173 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
174 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
175 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
178 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
179 Mono<MessageRouterSubscribeResponse> response = publisher
180 .put(publishRequest, plainBatch)
181 .then(subscriber.get(subscribeRequest));
184 StepVerifier.create(response)
185 .expectNext(expectedResponse)
191 void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){
193 final String topic = "TOPIC7";
194 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
196 final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
197 final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
198 final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
200 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
201 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
202 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
205 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
206 Mono<MessageRouterSubscribeResponse> response = publisher
207 .put(publishRequest, plainBatch)
208 .then(subscriber.get(subscribeRequest));
211 StepVerifier.create(response)
212 .expectNext(expectedResponse)
218 void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){
220 final String topic = "TOPIC8";
221 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
223 final List<String> singlePlainMessage = List.of("kebab");
224 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
225 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
227 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
228 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
229 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
232 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
233 Mono<MessageRouterSubscribeResponse> response = publisher
234 .put(publishRequest, plainBatch)
235 .then(subscriber.get(subscribeRequest));
238 StepVerifier.create(response)
239 .expectNext(expectedResponse)
245 void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){
247 final String topic = "TOPIC9";
248 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
250 final List<String> singlePlainMessage = List.of("I", "like", "pizza");
251 final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
252 final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
254 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
255 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
256 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
259 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
260 Mono<MessageRouterSubscribeResponse> response = publisher
261 .put(publishRequest, plainBatch)
262 .then(subscriber.get(subscribeRequest));
265 StepVerifier.create(response)
266 .expectNext(expectedResponse)