2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.Test;
28 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
29 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
33 import org.testcontainers.containers.DockerComposeContainer;
34 import org.testcontainers.junit.jupiter.Container;
35 import org.testcontainers.junit.jupiter.Testcontainers;
36 import reactor.core.publisher.Flux;
37 import reactor.core.publisher.Mono;
38 import reactor.test.StepVerifier;
40 import java.time.Duration;
42 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
43 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
44 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
45 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
46 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
47 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
48 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
49 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
52 class MessageRouterSubscriberIT {
53 private static final Duration TIMEOUT = Duration.ofSeconds(10);
54 private static final String CONSUMER_GROUP = "group1";
55 private static final String CONSUMER_ID = "consumer200";
56 private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
58 "\"mrstatus\":3001," +
59 "\"helpURL\":\"http://onap.readthedocs.io\"," +
60 "\"message\":\"No such topic exists.-[%s]\"," +
65 private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
67 private static String EVENTS_PATH;
69 private MessageRouterPublisher publisher = DmaapClientFactory
70 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
71 private MessageRouterSubscriber subscriber = DmaapClientFactory
72 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
77 EVENTS_PATH = String.format("http://%s:%d/events",
78 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
79 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
80 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
81 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
85 void subscriber_shouldHandleNoSuchTopicException() {
87 final String topic = "newTopic";
88 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
89 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
90 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
91 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
94 Mono<MessageRouterSubscribeResponse> response = subscriber
95 .get(mrSubscribeRequest);
98 StepVerifier.create(response)
99 .expectNext(expectedResponse)
105 void subscriberShouldHandleSingleItemResponse(){
107 final String topic = "TOPIC";
108 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
109 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
110 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
112 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
113 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
114 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
115 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
118 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
119 Mono<MessageRouterSubscribeResponse> response = publisher
120 .put(publishRequest, jsonMessageBatch)
121 .then(subscriber.get(subscribeRequest));
124 StepVerifier.create(response)
125 .expectNext(expectedResponse)
131 void subscriber_shouldHandleMultipleItemsResponse() {
133 final String topic = "TOPIC2";
134 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
135 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
136 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
138 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
139 "{\"differentMessage\":\"message2\"}");
140 final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
141 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
142 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
145 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
146 Mono<MessageRouterSubscribeResponse> response = publisher
147 .put(publishRequest, jsonMessageBatch)
148 .then(subscriber.get(subscribeRequest));
151 StepVerifier.create(response)
152 .expectNext(expectedResponse)
158 void subscriber_shouldExtractItemsFromResponse() {
160 final String topic = "TOPIC3";
161 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
162 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
163 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
164 CONSUMER_GROUP, CONSUMER_ID);
166 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
167 "{\"differentMessage\":\"message2\"}");
168 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
171 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
172 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
173 .thenMany(subscriber.getElements(subscribeRequest));
176 StepVerifier.create(result)
177 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
178 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
184 void subscriber_shouldSubscribeToTopic(){
186 final String topic = "TOPIC4";
187 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
188 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
189 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
190 CONSUMER_GROUP, CONSUMER_ID);
192 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
193 "{\"differentMessage\":\"message2\"}");
194 final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
195 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
198 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
199 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
200 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
203 StepVerifier.create(result.take(2))
204 .expectNext(messages.get(0))
205 .expectNext(messages.get(1))