2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonParser;
30 import java.time.Duration;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.List;
34 import org.junit.jupiter.api.BeforeAll;
35 import org.junit.jupiter.api.Test;
36 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
37 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
38 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
46 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
47 import org.testcontainers.containers.DockerComposeContainer;
48 import org.testcontainers.junit.jupiter.Container;
49 import org.testcontainers.junit.jupiter.Testcontainers;
50 import reactor.core.publisher.Flux;
51 import reactor.core.publisher.Mono;
52 import reactor.test.StepVerifier;
55 class MessageRouterSubscriberCIT {
56 private static final Gson gson = new Gson();
57 private static final JsonParser parser = new JsonParser();
58 private static final Duration TIMEOUT = Duration.ofSeconds(10);
59 private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
60 private static final String CONSUMER_GROUP = "group1";
61 private static final String CONSUMER_ID = "consumer200";
62 private static final String DMAAP_SERVICE_NAME = "dmaap";
63 private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
64 private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
65 MR_COMPOSE_RESOURCE_NAME);
66 private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
68 "\"mrstatus\":3001," +
69 "\"helpURL\":\"http://onap.readthedocs.io\"," +
70 "\"message\":\"No such topic exists.-[%s]\"," +
75 private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
76 new File(DOCKER_COMPOSE_FILE_PATH))
77 .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
79 private static String EVENTS_PATH;
81 private MessageRouterPublisher publisher = DmaapClientFactory
82 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
83 private MessageRouterSubscriber subscriber = DmaapClientFactory
84 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
89 EVENTS_PATH = String.format("http://%s:%d/events",
90 CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
91 CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
95 void subscriber_shouldHandleNoSuchTopicException() {
97 final String topic = "newTopic";
98 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
99 final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
100 final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
102 .failReason(expectedFailReason)
106 Mono<MessageRouterSubscribeResponse> response = subscriber
107 .get(mrSubscribeRequest);
110 StepVerifier.create(response)
111 .expectNext(expectedResponse)
117 void subscriberShouldHandleSingleItemResponse(){
119 final String topic = "TOPIC";
120 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
121 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
123 final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}");
124 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
125 final JsonArray expectedItems = getAsJsonArray(singleJsonMessage);
126 final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
128 .items(expectedItems)
132 registerTopic(publishRequest, subscribeRequest);
133 Mono<MessageRouterSubscribeResponse> response = publisher
134 .put(publishRequest, jsonMessageBatch)
135 .then(subscriber.get(subscribeRequest));
138 StepVerifier.create(response)
139 .expectNext(expectedResponse)
145 void subscriber_shouldHandleMultipleItemsResponse() {
147 final String topic = "TOPIC2";
148 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
149 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
151 final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
152 "{\"differentMessage\":\"message2\"}");
153 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
154 final JsonArray expectedItems = getAsJsonArray(twoJsonMessages);
155 final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
157 .items(expectedItems)
161 registerTopic(publishRequest, subscribeRequest);
162 Mono<MessageRouterSubscribeResponse> response = publisher
163 .put(publishRequest, jsonMessageBatch)
164 .then(subscriber.get(subscribeRequest));
167 StepVerifier.create(response)
168 .expectNext(expectedResponse)
174 void subscriber_shouldExtractItemsFromResponse() {
176 final String topic = "TOPIC3";
177 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
178 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
180 final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
181 "{\"differentMessage\":\"message2\"}");
182 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
185 registerTopic(publishRequest, subscribeRequest);
186 final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
187 .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString));
190 StepVerifier.create(result)
191 .expectNext(twoJsonMessages.get(0))
192 .expectNext(twoJsonMessages.get(1))
198 void subscriber_shouldSubscribeToTopic(){
200 final String topic = "TOPIC4";
201 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
202 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
204 final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
205 "{\"differentMessage\":\"message2\"}");
206 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
209 registerTopic(publishRequest, subscribeRequest);
210 final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
211 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))
212 .map(JsonElement::getAsString));
215 StepVerifier.create(result.take(2))
216 .expectNext(twoJsonMessages.get(0))
217 .expectNext(twoJsonMessages.get(1))
222 private static String getDockerComposeFilePath(String resourceName){
223 URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
224 .getResource(resourceName);
226 if(resource != null) return resource.getFile();
227 else throw new DockerComposeNotFoundException(String
228 .format("File %s does not exist", resourceName));
231 private static MessageRouterPublishRequest createMRPublishRequest(String topic){
232 MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
234 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
237 return ImmutableMessageRouterPublishRequest.builder()
238 .sinkDefinition(sinkDefinition)
242 private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
243 ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
245 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
248 return ImmutableMessageRouterSubscribeRequest
250 .sourceDefinition(sourceDefinition)
251 .consumerGroup(CONSUMER_GROUP)
252 .consumerId(CONSUMER_ID)
256 private void registerTopic(MessageRouterPublishRequest publishRequest,
257 MessageRouterSubscribeRequest subscribeRequest) {
258 final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
259 "{\"differentMessage\":\"message2\"}");
260 final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages)
261 .map(parser::parse).map(JsonElement::getAsJsonObject);
263 publisher.put(publishRequest, jsonMessageBatch).blockLast();
264 subscriber.get(subscribeRequest).block();
267 private JsonArray getAsJsonArray(List<String> list) {
268 String listsJsonString = gson.toJson(list);
269 return new JsonParser().parse(listsJsonString).getAsJsonArray();
272 private static Flux<JsonObject> jsonBatch(List<String> messages){
273 return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);