2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.Gson;
24 import com.google.gson.JsonElement;
25 import com.google.gson.JsonObject;
26 import com.google.gson.JsonParser;
27 import io.vavr.collection.List;
30 import java.time.Duration;
31 import org.junit.jupiter.api.BeforeAll;
32 import org.junit.jupiter.api.Test;
33 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
34 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
35 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
44 import org.testcontainers.containers.DockerComposeContainer;
45 import org.testcontainers.junit.jupiter.Container;
46 import org.testcontainers.junit.jupiter.Testcontainers;
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
49 import reactor.test.StepVerifier;
52 class MessageRouterSubscriberCIT {
53 private static final JsonParser parser = new JsonParser();
54 private static final Duration TIMEOUT = Duration.ofSeconds(10);
55 private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
56 private static final String CONSUMER_GROUP = "group1";
57 private static final String CONSUMER_ID = "consumer200";
58 private static final String DMAAP_SERVICE_NAME = "dmaap";
59 private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
60 private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
61 MR_COMPOSE_RESOURCE_NAME);
62 private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
64 "\"mrstatus\":3001," +
65 "\"helpURL\":\"http://onap.readthedocs.io\"," +
66 "\"message\":\"No such topic exists.-[%s]\"," +
71 private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
72 new File(DOCKER_COMPOSE_FILE_PATH))
73 .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
75 private static String EVENTS_PATH;
77 private MessageRouterPublisher publisher = DmaapClientFactory
78 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
79 private MessageRouterSubscriber subscriber = DmaapClientFactory
80 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
85 EVENTS_PATH = String.format("http://%s:%d/events",
86 CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
87 CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
91 void subscriber_shouldHandleNoSuchTopicException() {
93 final String topic = "newTopic";
94 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
95 final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
96 final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
98 .failReason(expectedFailReason)
102 Mono<MessageRouterSubscribeResponse> response = subscriber
103 .get(mrSubscribeRequest);
106 StepVerifier.create(response)
107 .expectNext(expectedResponse)
113 void subscriberShouldHandleSingleItemResponse(){
115 final String topic = "TOPIC";
116 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
117 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
119 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
120 final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse);
121 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
122 final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
124 .items(expectedItems)
128 registerTopic(publishRequest, subscribeRequest);
129 Mono<MessageRouterSubscribeResponse> response = publisher
130 .put(publishRequest, jsonMessageBatch)
131 .then(subscriber.get(subscribeRequest));
134 StepVerifier.create(response)
135 .expectNext(expectedResponse)
141 void subscriber_shouldHandleMultipleItemsResponse() {
143 final String topic = "TOPIC2";
144 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
145 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
147 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
148 "{\"differentMessage\":\"message2\"}");
149 final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse);
150 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
151 final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
153 .items(expectedElements)
157 registerTopic(publishRequest, subscribeRequest);
158 Mono<MessageRouterSubscribeResponse> response = publisher
159 .put(publishRequest, jsonMessageBatch)
160 .then(subscriber.get(subscribeRequest));
163 StepVerifier.create(response)
164 .expectNext(expectedResponse)
170 void subscriber_shouldExtractItemsFromResponse() {
172 final String topic = "TOPIC3";
173 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
174 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
176 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
177 "{\"differentMessage\":\"message2\"}");
178 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
181 registerTopic(publishRequest, subscribeRequest);
182 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
183 .thenMany(subscriber.getElements(subscribeRequest));
186 StepVerifier.create(result)
187 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
188 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
194 void subscriber_shouldSubscribeToTopic(){
196 final String topic = "TOPIC4";
197 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
198 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
200 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
201 "{\"differentMessage\":\"message2\"}");
202 final List<JsonElement> messages = twoJsonMessages.map(parser::parse);
203 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
206 registerTopic(publishRequest, subscribeRequest);
207 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
208 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
211 StepVerifier.create(result.take(2))
212 .expectNext(messages.get(0))
213 .expectNext(messages.get(1))
218 private static String getDockerComposeFilePath(String resourceName){
219 URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
220 .getResource(resourceName);
222 if(resource != null) return resource.getFile();
223 else throw new DockerComposeNotFoundException(String
224 .format("File %s does not exist", resourceName));
227 private static MessageRouterPublishRequest createMRPublishRequest(String topic){
228 MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
230 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
233 return ImmutableMessageRouterPublishRequest.builder()
234 .sinkDefinition(sinkDefinition)
238 private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
239 ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
241 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
244 return ImmutableMessageRouterSubscribeRequest
246 .sourceDefinition(sourceDefinition)
247 .consumerGroup(CONSUMER_GROUP)
248 .consumerId(CONSUMER_ID)
252 private void registerTopic(MessageRouterPublishRequest publishRequest,
253 MessageRouterSubscribeRequest subscribeRequest) {
254 final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
255 "{\"differentMessage\":\"message2\"}");
256 final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages);
258 publisher.put(publishRequest, jsonMessageBatch).blockLast();
259 subscriber.get(subscribeRequest).block();
262 private static Flux<JsonObject> jsonBatch(List<String> messages){
263 return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
266 private JsonObject getAsJsonObject(String item){
267 return new Gson().fromJson(item, JsonObject.class);