2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
  23 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
 
  25 import com.google.gson.JsonElement;
 
  26 import com.google.gson.JsonObject;
 
  27 import io.vavr.collection.List;
 
  28 import java.time.Duration;
 
  29 import org.junit.jupiter.api.BeforeAll;
 
  30 import org.junit.jupiter.api.Test;
 
  31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 
  32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 
  33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 
  34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 
  35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
  36 import org.testcontainers.containers.DockerComposeContainer;
 
  37 import org.testcontainers.junit.jupiter.Container;
 
  38 import org.testcontainers.junit.jupiter.Testcontainers;
 
  39 import reactor.core.publisher.Flux;
 
  40 import reactor.core.publisher.Mono;
 
  41 import reactor.test.StepVerifier;
 
  44 class MessageRouterSubscriberIT {
 
  45     private static final Duration TIMEOUT = Duration.ofSeconds(10);
 
  46     private static final String CONSUMER_GROUP = "group1";
 
  47     private static final String CONSUMER_ID = "consumer200";
 
  48     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
 
  50             "\"mrstatus\":3001," +
 
  51             "\"helpURL\":\"http://onap.readthedocs.io\"," +
 
  52             "\"message\":\"No such topic exists.-[%s]\"," +
 
  57     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
 
  59     private static String EVENTS_PATH;
 
  61     private MessageRouterPublisher publisher = DmaapClientFactory
 
  62             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
 
  63     private MessageRouterSubscriber subscriber = DmaapClientFactory
 
  64             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
  69         EVENTS_PATH = String.format("http://%s:%d/events",
 
  70                 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
 
  71                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
 
  72                 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
 
  73                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
 
  77     void subscriber_shouldHandleNoSuchTopicException() {
 
  79         final String topic = "newTopic";
 
  80         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
 
  81                 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
 
  82         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
 
  83                 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
 
  86         Mono<MessageRouterSubscribeResponse> response = subscriber
 
  87                 .get(mrSubscribeRequest);
 
  90         StepVerifier.create(response)
 
  91                 .expectNext(expectedResponse)
 
  97     void subscriberShouldHandleSingleItemResponse(){
 
  99         final String topic = "TOPIC";
 
 100         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 101         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 102         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
 
 104         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 105         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 106         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
 
 107         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 110         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 111         Mono<MessageRouterSubscribeResponse> response = publisher
 
 112                 .put(publishRequest, jsonMessageBatch)
 
 113                 .then(subscriber.get(subscribeRequest));
 
 116         StepVerifier.create(response)
 
 117                 .expectNext(expectedResponse)
 
 123     void subscriber_shouldHandleMultipleItemsResponse() {
 
 125         final String topic = "TOPIC2";
 
 126         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 127         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 128         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
 
 130         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 131                 "{\"differentMessage\":\"message2\"}");
 
 132         final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
 
 133         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 134         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
 
 137         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 138         Mono<MessageRouterSubscribeResponse> response = publisher
 
 139                 .put(publishRequest, jsonMessageBatch)
 
 140                 .then(subscriber.get(subscribeRequest));
 
 143         StepVerifier.create(response)
 
 144                 .expectNext(expectedResponse)
 
 150     void subscriber_shouldExtractItemsFromResponse() {
 
 152         final String topic = "TOPIC3";
 
 153         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 154         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 155         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
 
 156                 CONSUMER_GROUP, CONSUMER_ID);
 
 158         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 159                 "{\"differentMessage\":\"message2\"}");
 
 160         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 163         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 164         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
 
 165                 .thenMany(subscriber.getElements(subscribeRequest));
 
 168         StepVerifier.create(result)
 
 169                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
 
 170                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
 
 176     void subscriber_shouldSubscribeToTopic(){
 
 178         final String topic = "TOPIC4";
 
 179         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 180         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 181         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
 
 182                 CONSUMER_GROUP, CONSUMER_ID);
 
 184         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 185                 "{\"differentMessage\":\"message2\"}");
 
 186         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
 
 187         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 190         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 191         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
 
 192                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
 
 195         StepVerifier.create(result.take(2))
 
 196                 .expectNext(messages.get(0))
 
 197                 .expectNext(messages.get(1))