0afea74838c3806928f502744924f3747133ff8b
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
24
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import io.vavr.collection.List;
28 import java.time.Duration;
29 import org.junit.jupiter.api.BeforeAll;
30 import org.junit.jupiter.api.Test;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
38 import org.testcontainers.containers.DockerComposeContainer;
39 import org.testcontainers.junit.jupiter.Container;
40 import org.testcontainers.junit.jupiter.Testcontainers;
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
43 import reactor.test.StepVerifier;
44
45 @Testcontainers
46 class MessageRouterPublisherIT {
47     @Container
48     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
49     private static final Duration TIMEOUT = Duration.ofSeconds(10);
50     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
51             + "{"
52             + "\"mrstatus\":5007,"
53             + "\"helpURL\":\"http://onap.readthedocs.io\","
54             + "\"message\":\"Error while publishing data to topic.:%s."
55             + "Successfully published number of messages :0."
56             + "Expected { to start an object.\",\"status\":400"
57             + "}";
58     private static String EVENTS_PATH;
59     private final MessageRouterPublisher publisher = DmaapClientFactory
60             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
61     private MessageRouterSubscriber subscriber = DmaapClientFactory
62             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
63
64     @BeforeAll
65     static void setUp() {
66         EVENTS_PATH = String.format("http://%s:%d/events",
67                 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
68                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
69                 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
70                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
71     }
72
73     @Test
74     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
75         //given
76         final String topic = "TOPIC";
77         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
78                 "{\"differentMessage\":\"message2\"}");
79         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
80         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
81         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
82
83         //when
84         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
85
86         //then
87         StepVerifier.create(result)
88                 .expectNext(expectedResponse)
89                 .expectComplete()
90                 .verify(TIMEOUT);
91     }
92
93     @Test
94     void publisher_shouldHandleBadRequestError(){
95         //given
96         final String topic = "TOPIC2";
97         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
98         final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
99         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
100         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
101                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
102
103         //when
104         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
105
106         //then
107         StepVerifier.create(result)
108                 .expectNext(expectedResponse)
109                 .expectComplete()
110                 .verify(TIMEOUT);
111     }
112
113     @Test
114     void publisher_shouldSuccessfullyPublishSingleMessage(){
115         //given
116         final String topic = "TOPIC3";
117         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
118         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
119         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
120         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
121         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
122         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
123         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
124
125         //when
126         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
127         Mono<MessageRouterSubscribeResponse> response = publisher
128                 .put(publishRequest, jsonMessageBatch)
129                 .then(subscriber.get(subscribeRequest));
130
131         //then
132         StepVerifier.create(response)
133                 .expectNext(expectedResponse)
134                 .expectComplete()
135                 .verify();
136     }
137
138     @Test
139     void publisher_shouldSuccessfullyPublishMultipleMessages(){
140         final String topic = "TOPIC5";
141         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
142         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
143                 "{\"differentMessage\":\"message2\"}");
144         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
145         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
146         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
147         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
148         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
149
150         //when
151         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
152         Mono<MessageRouterSubscribeResponse> response = publisher
153                 .put(publishRequest, jsonMessageBatch)
154                 .then(subscriber.get(subscribeRequest));
155
156         //then
157         StepVerifier.create(response)
158                 .expectNext(expectedResponse)
159                 .expectComplete()
160                 .verify();
161     }
162
163     @Test
164     void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){
165         //given
166         final String topic = "TOPIC6";
167         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
168
169         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
170         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
171         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
172
173         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
174         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
175         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
176
177         //when
178         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
179         Mono<MessageRouterSubscribeResponse> response = publisher
180                 .put(publishRequest, plainBatch)
181                 .then(subscriber.get(subscribeRequest));
182
183         //then
184         StepVerifier.create(response)
185                 .expectNext(expectedResponse)
186                 .expectComplete()
187                 .verify();
188     }
189
190     @Test
191     void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){
192         //given
193         final String topic = "TOPIC7";
194         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
195
196         final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
197         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
198         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
199
200         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
201         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
202         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
203
204         //when
205         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
206         Mono<MessageRouterSubscribeResponse> response = publisher
207                 .put(publishRequest, plainBatch)
208                 .then(subscriber.get(subscribeRequest));
209
210         //then
211         StepVerifier.create(response)
212                 .expectNext(expectedResponse)
213                 .expectComplete()
214                 .verify();
215     }
216
217     @Test
218     void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){
219         //given
220         final String topic = "TOPIC8";
221         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
222
223         final List<String> singlePlainMessage = List.of("kebab");
224         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
225         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
226
227         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
228         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
229         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
230
231         //when
232         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
233         Mono<MessageRouterSubscribeResponse> response = publisher
234                 .put(publishRequest, plainBatch)
235                 .then(subscriber.get(subscribeRequest));
236
237         //then
238         StepVerifier.create(response)
239                 .expectNext(expectedResponse)
240                 .expectComplete()
241                 .verify();
242     }
243
244     @Test
245     void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){
246         //given
247         final String topic = "TOPIC9";
248         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
249
250         final List<String> singlePlainMessage = List.of("I", "like", "pizza");
251         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
252         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
253
254         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
255         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
256         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
257
258         //when
259         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
260         Mono<MessageRouterSubscribeResponse> response = publisher
261                 .put(publishRequest, plainBatch)
262                 .then(subscriber.get(subscribeRequest));
263
264         //then
265         StepVerifier.create(response)
266                 .expectNext(expectedResponse)
267                 .expectComplete()
268                 .verify();
269     }
270 }