f62359dd10e78e9bde76a854b078307ba427b29d
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / api / MessageRouterPublisherIT.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.Test;
28 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
29 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
35 import org.testcontainers.containers.DockerComposeContainer;
36 import org.testcontainers.junit.jupiter.Container;
37 import org.testcontainers.junit.jupiter.Testcontainers;
38 import reactor.core.publisher.Flux;
39 import reactor.core.publisher.Mono;
40 import reactor.test.StepVerifier;
41
42 import java.time.Duration;
43
44 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
45 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
46 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
47 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
48 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
49 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
50 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
53
54 @Testcontainers
55 class MessageRouterPublisherIT {
56     @Container
57     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
58     private static final Duration TIMEOUT = Duration.ofSeconds(10);
59     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
60             + "{"
61             + "\"mrstatus\":5007,"
62             + "\"helpURL\":\"http://onap.readthedocs.io\","
63             + "\"message\":\"Error while publishing data to topic.:%s."
64             + "Successfully published number of messages :0."
65             + "Expected { to start an object.\",\"status\":400"
66             + "}";
67     private static String EVENTS_PATH;
68     private final MessageRouterPublisher publisher = DmaapClientFactory
69             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
70     private MessageRouterSubscriber subscriber = DmaapClientFactory
71             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
72
73     @BeforeAll
74     static void setUp() {
75         EVENTS_PATH = String.format("http://%s:%d/events",
76                 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
77                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
78                 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
79                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
80     }
81
82     @Test
83     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
84         //given
85         final String topic = "TOPIC";
86         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
87                 "{\"differentMessage\":\"message2\"}");
88         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
89         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
90         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
91
92         //when
93         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
94
95         //then
96         StepVerifier.create(result)
97                 .expectNext(expectedResponse)
98                 .expectComplete()
99                 .verify(TIMEOUT);
100     }
101
102     @Test
103     void publisher_shouldHandleBadRequestError(){
104         //given
105         final String topic = "TOPIC2";
106         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
107         final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
108         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
109         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
110                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
111
112         //when
113         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
114
115         //then
116         StepVerifier.create(result)
117                 .expectNext(expectedResponse)
118                 .expectComplete()
119                 .verify(TIMEOUT);
120     }
121
122     @Test
123     void publisher_shouldSuccessfullyPublishSingleMessage(){
124         //given
125         final String topic = "TOPIC3";
126         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
127         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
128         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
129         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
130         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
131         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
132         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
133
134         //when
135         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
136         Mono<MessageRouterSubscribeResponse> response = publisher
137                 .put(publishRequest, jsonMessageBatch)
138                 .then(subscriber.get(subscribeRequest));
139
140         //then
141         StepVerifier.create(response)
142                 .expectNext(expectedResponse)
143                 .expectComplete()
144                 .verify();
145     }
146
147     @Test
148     void publisher_shouldSuccessfullyPublishMultipleMessages(){
149         final String topic = "TOPIC5";
150         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
151         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
152                 "{\"differentMessage\":\"message2\"}");
153         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
154         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
155         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
156         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
157         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
158
159         //when
160         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
161         Mono<MessageRouterSubscribeResponse> response = publisher
162                 .put(publishRequest, jsonMessageBatch)
163                 .then(subscriber.get(subscribeRequest));
164
165         //then
166         StepVerifier.create(response)
167                 .expectNext(expectedResponse)
168                 .expectComplete()
169                 .verify();
170     }
171
172     @Test
173     void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){
174         //given
175         final String topic = "TOPIC6";
176         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
177
178         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
179         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
180         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
181
182         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
183         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
184         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
185
186         //when
187         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
188         Mono<MessageRouterSubscribeResponse> response = publisher
189                 .put(publishRequest, plainBatch)
190                 .then(subscriber.get(subscribeRequest));
191
192         //then
193         StepVerifier.create(response)
194                 .expectNext(expectedResponse)
195                 .expectComplete()
196                 .verify();
197     }
198
199     @Test
200     void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){
201         //given
202         final String topic = "TOPIC7";
203         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
204
205         final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
206         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
207         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
208
209         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
210         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
211         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
212
213         //when
214         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
215         Mono<MessageRouterSubscribeResponse> response = publisher
216                 .put(publishRequest, plainBatch)
217                 .then(subscriber.get(subscribeRequest));
218
219         //then
220         StepVerifier.create(response)
221                 .expectNext(expectedResponse)
222                 .expectComplete()
223                 .verify();
224     }
225
226     @Test
227     void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){
228         //given
229         final String topic = "TOPIC8";
230         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
231
232         final List<String> singlePlainMessage = List.of("kebab");
233         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
234         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
235
236         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
237         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
238         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
239
240         //when
241         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
242         Mono<MessageRouterSubscribeResponse> response = publisher
243                 .put(publishRequest, plainBatch)
244                 .then(subscriber.get(subscribeRequest));
245
246         //then
247         StepVerifier.create(response)
248                 .expectNext(expectedResponse)
249                 .expectComplete()
250                 .verify();
251     }
252
253     @Test
254     void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){
255         //given
256         final String topic = "TOPIC9";
257         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
258
259         final List<String> singlePlainMessage = List.of("I", "like", "pizza");
260         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
261         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
262
263         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
264         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
265         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
266
267         //when
268         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
269         Mono<MessageRouterSubscribeResponse> response = publisher
270                 .put(publishRequest, plainBatch)
271                 .then(subscriber.get(subscribeRequest));
272
273         //then
274         StepVerifier.create(response)
275                 .expectNext(expectedResponse)
276                 .expectComplete()
277                 .verify();
278     }
279 }