32b77a1d22da53036e678b399497c4c6865f58d3
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.Gson;
24 import com.google.gson.JsonElement;
25 import com.google.gson.JsonObject;
26 import com.google.gson.JsonParser;
27 import io.vavr.collection.List;
28 import java.io.File;
29 import java.net.URL;
30 import java.time.Duration;
31 import org.junit.jupiter.api.BeforeAll;
32 import org.junit.jupiter.api.Test;
33 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
34 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
35 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
44 import org.testcontainers.containers.DockerComposeContainer;
45 import org.testcontainers.junit.jupiter.Container;
46 import org.testcontainers.junit.jupiter.Testcontainers;
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
49 import reactor.test.StepVerifier;
50
51 @Testcontainers
52 class MessageRouterSubscriberCIT {
53     private static final JsonParser parser = new JsonParser();
54     private static final Duration TIMEOUT = Duration.ofSeconds(10);
55     private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
56     private static final String CONSUMER_GROUP = "group1";
57     private static final String CONSUMER_ID = "consumer200";
58     private static final String DMAAP_SERVICE_NAME = "dmaap";
59     private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
60     private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
61             MR_COMPOSE_RESOURCE_NAME);
62     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
63             "{" +
64             "\"mrstatus\":3001," +
65             "\"helpURL\":\"http://onap.readthedocs.io\"," +
66             "\"message\":\"No such topic exists.-[%s]\"," +
67             "\"status\":404" +
68             "}";
69
70     @Container
71     private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
72             new File(DOCKER_COMPOSE_FILE_PATH))
73             .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
74
75     private static String EVENTS_PATH;
76
77     private MessageRouterPublisher publisher = DmaapClientFactory
78             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
79     private MessageRouterSubscriber subscriber = DmaapClientFactory
80             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
81
82
83     @BeforeAll
84     static void setUp() {
85         EVENTS_PATH = String.format("http://%s:%d/events",
86                 CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
87                 CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
88     }
89
90     @Test
91     void subscriber_shouldHandleNoSuchTopicException() {
92         //given
93         final String topic = "newTopic";
94         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
95         final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
96         final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
97                 .builder()
98                 .failReason(expectedFailReason)
99                 .build();
100
101         //when
102         Mono<MessageRouterSubscribeResponse> response = subscriber
103                 .get(mrSubscribeRequest);
104
105         //then
106         StepVerifier.create(response)
107                 .expectNext(expectedResponse)
108                 .expectComplete()
109                 .verify(TIMEOUT);
110     }
111
112     @Test
113     void subscriberShouldHandleSingleItemResponse(){
114         //given
115         final String topic = "TOPIC";
116         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
117         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
118
119         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
120         final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse);
121         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
122         final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
123                 .builder()
124                 .items(expectedItems)
125                 .build();
126
127         //when
128         registerTopic(publishRequest, subscribeRequest);
129         Mono<MessageRouterSubscribeResponse> response = publisher
130                 .put(publishRequest, jsonMessageBatch)
131                 .then(subscriber.get(subscribeRequest));
132
133         //then
134         StepVerifier.create(response)
135                 .expectNext(expectedResponse)
136                 .expectComplete()
137                 .verify();
138     }
139
140     @Test
141     void subscriber_shouldHandleMultipleItemsResponse() {
142         //given
143         final String topic = "TOPIC2";
144         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
145         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
146
147         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
148                 "{\"differentMessage\":\"message2\"}");
149         final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse);
150         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
151         final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
152                 .builder()
153                 .items(expectedElements)
154                 .build();
155
156         //when
157         registerTopic(publishRequest, subscribeRequest);
158         Mono<MessageRouterSubscribeResponse> response = publisher
159                 .put(publishRequest, jsonMessageBatch)
160                 .then(subscriber.get(subscribeRequest));
161
162         //then
163         StepVerifier.create(response)
164                 .expectNext(expectedResponse)
165                 .expectComplete()
166                 .verify();
167     }
168
169     @Test
170     void subscriber_shouldExtractItemsFromResponse() {
171         //given
172         final String topic = "TOPIC3";
173         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
174         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
175
176         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
177                 "{\"differentMessage\":\"message2\"}");
178         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
179
180         //when
181         registerTopic(publishRequest, subscribeRequest);
182         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
183                 .thenMany(subscriber.getElements(subscribeRequest));
184
185         //then
186         StepVerifier.create(result)
187                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
188                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
189                 .expectComplete()
190                 .verify(TIMEOUT);
191     }
192
193     @Test
194     void subscriber_shouldSubscribeToTopic(){
195         //given
196         final String topic = "TOPIC4";
197         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
198         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
199
200         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
201                 "{\"differentMessage\":\"message2\"}");
202         final List<JsonElement> messages = twoJsonMessages.map(parser::parse);
203         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
204
205         //when
206         registerTopic(publishRequest, subscribeRequest);
207         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
208                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
209
210         //then
211         StepVerifier.create(result.take(2))
212                 .expectNext(messages.get(0))
213                 .expectNext(messages.get(1))
214                 .expectComplete()
215                 .verify(TIMEOUT);
216     }
217
218     private static String getDockerComposeFilePath(String resourceName){
219         URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
220                 .getResource(resourceName);
221
222         if(resource != null) return resource.getFile();
223         else throw new DockerComposeNotFoundException(String
224                 .format("File %s does not exist", resourceName));
225     }
226
227     private static MessageRouterPublishRequest createMRPublishRequest(String topic){
228         MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
229                 .name("the topic")
230                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
231                 .build();
232
233         return ImmutableMessageRouterPublishRequest.builder()
234                 .sinkDefinition(sinkDefinition)
235                 .build();
236     }
237
238     private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
239         ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
240                 .name("the topic")
241                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
242                 .build();
243
244         return ImmutableMessageRouterSubscribeRequest
245                 .builder()
246                 .sourceDefinition(sourceDefinition)
247                 .consumerGroup(CONSUMER_GROUP)
248                 .consumerId(CONSUMER_ID)
249                 .build();
250     }
251
252     private void registerTopic(MessageRouterPublishRequest publishRequest,
253             MessageRouterSubscribeRequest subscribeRequest) {
254         final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
255                 "{\"differentMessage\":\"message2\"}");
256         final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages);
257
258         publisher.put(publishRequest, jsonMessageBatch).blockLast();
259         subscriber.get(subscribeRequest).block();
260     }
261
262     private static Flux<JsonObject> jsonBatch(List<String> messages){
263         return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
264     }
265
266     private JsonObject getAsJsonObject(String item){
267         return new Gson().fromJson(item, JsonObject.class);
268     }
269 }