1f0fdafd0893684f9dd018b35dfe854e6463ec82
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonParser;
28 import java.io.File;
29 import java.net.URL;
30 import java.time.Duration;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.List;
34 import org.junit.jupiter.api.BeforeAll;
35 import org.junit.jupiter.api.Test;
36 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
37 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
38 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
46 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
47 import org.testcontainers.containers.DockerComposeContainer;
48 import org.testcontainers.junit.jupiter.Container;
49 import org.testcontainers.junit.jupiter.Testcontainers;
50 import reactor.core.publisher.Flux;
51 import reactor.core.publisher.Mono;
52 import reactor.test.StepVerifier;
53
54 @Testcontainers
55 class MessageRouterSubscriberCIT {
56     private static final Gson gson = new Gson();
57     private static final JsonParser parser = new JsonParser();
58     private static final Duration TIMEOUT = Duration.ofSeconds(10);
59     private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
60     private static final String CONSUMER_GROUP = "group1";
61     private static final String CONSUMER_ID = "consumer200";
62     private static final String DMAAP_SERVICE_NAME = "dmaap";
63     private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
64     private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
65             MR_COMPOSE_RESOURCE_NAME);
66     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
67             "{" +
68             "\"mrstatus\":3001," +
69             "\"helpURL\":\"http://onap.readthedocs.io\"," +
70             "\"message\":\"No such topic exists.-[%s]\"," +
71             "\"status\":404" +
72             "}";
73
74     @Container
75     private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
76             new File(DOCKER_COMPOSE_FILE_PATH))
77             .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
78
79     private static String EVENTS_PATH;
80
81     private MessageRouterPublisher publisher = DmaapClientFactory
82             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
83     private MessageRouterSubscriber subscriber = DmaapClientFactory
84             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
85
86
87     @BeforeAll
88     static void setUp() {
89         EVENTS_PATH = String.format("http://%s:%d/events",
90                 CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
91                 CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
92     }
93
94     @Test
95     void subscriber_shouldHandleNoSuchTopicException() {
96         //given
97         final String topic = "newTopic";
98         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
99         final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
100         final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
101                 .builder()
102                 .failReason(expectedFailReason)
103                 .build();
104
105         //when
106         Mono<MessageRouterSubscribeResponse> response = subscriber
107                 .get(mrSubscribeRequest);
108
109         //then
110         StepVerifier.create(response)
111                 .expectNext(expectedResponse)
112                 .expectComplete()
113                 .verify(TIMEOUT);
114     }
115
116     @Test
117     void subscriberShouldHandleSingleItemResponse(){
118         //given
119         final String topic = "TOPIC";
120         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
121         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
122
123         final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}");
124         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
125         final JsonArray expectedItems = getAsJsonArray(singleJsonMessage);
126         final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
127                 .builder()
128                 .items(expectedItems)
129                 .build();
130
131         //when
132         registerTopic(publishRequest, subscribeRequest);
133         Mono<MessageRouterSubscribeResponse> response = publisher
134                 .put(publishRequest, jsonMessageBatch)
135                 .then(subscriber.get(subscribeRequest));
136
137         //then
138         StepVerifier.create(response)
139                 .expectNext(expectedResponse)
140                 .expectComplete()
141                 .verify();
142     }
143
144     @Test
145     void subscriber_shouldHandleMultipleItemsResponse() {
146         //given
147         final String topic = "TOPIC2";
148         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
149         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
150
151         final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
152                 "{\"differentMessage\":\"message2\"}");
153         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
154         final JsonArray expectedItems = getAsJsonArray(twoJsonMessages);
155         final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
156                 .builder()
157                 .items(expectedItems)
158                 .build();
159
160         //when
161         registerTopic(publishRequest, subscribeRequest);
162         Mono<MessageRouterSubscribeResponse> response = publisher
163                 .put(publishRequest, jsonMessageBatch)
164                 .then(subscriber.get(subscribeRequest));
165
166         //then
167         StepVerifier.create(response)
168                 .expectNext(expectedResponse)
169                 .expectComplete()
170                 .verify();
171     }
172
173     @Test
174     void subscriber_shouldExtractItemsFromResponse() {
175         //given
176         final String topic = "TOPIC3";
177         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
178         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
179
180         final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
181                 "{\"differentMessage\":\"message2\"}");
182         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
183
184         //when
185         registerTopic(publishRequest, subscribeRequest);
186         final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
187                 .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString));
188
189         //then
190         StepVerifier.create(result)
191                 .expectNext(twoJsonMessages.get(0))
192                 .expectNext(twoJsonMessages.get(1))
193                 .expectComplete()
194                 .verify(TIMEOUT);
195     }
196
197     @Test
198     void subscriber_shouldSubscribeToTopic(){
199         //given
200         final String topic = "TOPIC4";
201         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
202         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
203
204         final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
205                 "{\"differentMessage\":\"message2\"}");
206         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
207
208         //when
209         registerTopic(publishRequest, subscribeRequest);
210         final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
211                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))
212                         .map(JsonElement::getAsString));
213
214         //then
215         StepVerifier.create(result.take(2))
216                 .expectNext(twoJsonMessages.get(0))
217                 .expectNext(twoJsonMessages.get(1))
218                 .expectComplete()
219                 .verify(TIMEOUT);
220     }
221
222     private static String getDockerComposeFilePath(String resourceName){
223         URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
224                 .getResource(resourceName);
225
226         if(resource != null) return resource.getFile();
227         else throw new DockerComposeNotFoundException(String
228                 .format("File %s does not exist", resourceName));
229     }
230
231     private static MessageRouterPublishRequest createMRPublishRequest(String topic){
232         MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
233                 .name("the topic")
234                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
235                 .build();
236
237         return ImmutableMessageRouterPublishRequest.builder()
238                 .sinkDefinition(sinkDefinition)
239                 .build();
240     }
241
242     private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
243         ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
244                 .name("the topic")
245                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
246                 .build();
247
248         return ImmutableMessageRouterSubscribeRequest
249                 .builder()
250                 .sourceDefinition(sourceDefinition)
251                 .consumerGroup(CONSUMER_GROUP)
252                 .consumerId(CONSUMER_ID)
253                 .build();
254     }
255
256     private void registerTopic(MessageRouterPublishRequest publishRequest,
257             MessageRouterSubscribeRequest subscribeRequest) {
258         final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
259                 "{\"differentMessage\":\"message2\"}");
260         final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages)
261                 .map(parser::parse).map(JsonElement::getAsJsonObject);
262
263         publisher.put(publishRequest, jsonMessageBatch).blockLast();
264         subscriber.get(subscribeRequest).block();
265     }
266
267     private JsonArray getAsJsonArray(List<String> list) {
268         String listsJsonString = gson.toJson(list);
269         return new JsonParser().parse(listsJsonString).getAsJsonArray();
270     }
271
272     private static Flux<JsonObject> jsonBatch(List<String> messages){
273         return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
274     }
275 }