2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import io.vavr.collection.List;
28 import java.time.Duration;
29 import org.junit.jupiter.api.BeforeAll;
30 import org.junit.jupiter.api.Test;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
36 import org.testcontainers.containers.DockerComposeContainer;
37 import org.testcontainers.junit.jupiter.Container;
38 import org.testcontainers.junit.jupiter.Testcontainers;
39 import reactor.core.publisher.Flux;
40 import reactor.core.publisher.Mono;
41 import reactor.test.StepVerifier;
44 class MessageRouterSubscriberIT {
45 private static final Duration TIMEOUT = Duration.ofSeconds(10);
46 private static final String CONSUMER_GROUP = "group1";
47 private static final String CONSUMER_ID = "consumer200";
48 private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
50 "\"mrstatus\":3001," +
51 "\"helpURL\":\"http://onap.readthedocs.io\"," +
52 "\"message\":\"No such topic exists.-[%s]\"," +
57 private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
59 private static String EVENTS_PATH;
61 private MessageRouterPublisher publisher = DmaapClientFactory
62 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
63 private MessageRouterSubscriber subscriber = DmaapClientFactory
64 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
69 EVENTS_PATH = String.format("http://%s:%d/events",
70 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
71 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
72 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
73 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
77 void subscriber_shouldHandleNoSuchTopicException() {
79 final String topic = "newTopic";
80 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
81 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
82 final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
83 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
86 Mono<MessageRouterSubscribeResponse> response = subscriber
87 .get(mrSubscribeRequest);
90 StepVerifier.create(response)
91 .expectNext(expectedResponse)
97 void subscriberShouldHandleSingleItemResponse(){
99 final String topic = "TOPIC";
100 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
101 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
102 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
104 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
105 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
106 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
107 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
110 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
111 Mono<MessageRouterSubscribeResponse> response = publisher
112 .put(publishRequest, jsonMessageBatch)
113 .then(subscriber.get(subscribeRequest));
116 StepVerifier.create(response)
117 .expectNext(expectedResponse)
123 void subscriber_shouldHandleMultipleItemsResponse() {
125 final String topic = "TOPIC2";
126 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
127 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
128 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
130 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
131 "{\"differentMessage\":\"message2\"}");
132 final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
133 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
134 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
137 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
138 Mono<MessageRouterSubscribeResponse> response = publisher
139 .put(publishRequest, jsonMessageBatch)
140 .then(subscriber.get(subscribeRequest));
143 StepVerifier.create(response)
144 .expectNext(expectedResponse)
150 void subscriber_shouldExtractItemsFromResponse() {
152 final String topic = "TOPIC3";
153 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
154 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
155 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
156 CONSUMER_GROUP, CONSUMER_ID);
158 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
159 "{\"differentMessage\":\"message2\"}");
160 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
163 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
164 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
165 .thenMany(subscriber.getElements(subscribeRequest));
168 StepVerifier.create(result)
169 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
170 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
176 void subscriber_shouldSubscribeToTopic(){
178 final String topic = "TOPIC4";
179 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
180 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
181 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
182 CONSUMER_GROUP, CONSUMER_ID);
184 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
185 "{\"differentMessage\":\"message2\"}");
186 final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
187 final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
190 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
191 final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
192 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
195 StepVerifier.create(result.take(2))
196 .expectNext(messages.get(0))
197 .expectNext(messages.get(1))