2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
  23 import com.google.gson.Gson;
 
  24 import com.google.gson.JsonElement;
 
  25 import com.google.gson.JsonObject;
 
  26 import com.google.gson.JsonParser;
 
  27 import io.vavr.collection.List;
 
  30 import java.time.Duration;
 
  31 import org.junit.jupiter.api.BeforeAll;
 
  32 import org.junit.jupiter.api.Test;
 
  33 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 
  34 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
 
  35 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 
  36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
 
  38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
 
  39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 
  40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
  44 import org.testcontainers.containers.DockerComposeContainer;
 
  45 import org.testcontainers.junit.jupiter.Container;
 
  46 import org.testcontainers.junit.jupiter.Testcontainers;
 
  47 import reactor.core.publisher.Flux;
 
  48 import reactor.core.publisher.Mono;
 
  49 import reactor.test.StepVerifier;
 
  52 class MessageRouterSubscriberCIT {
 
  53     private static final JsonParser parser = new JsonParser();
 
  54     private static final Duration TIMEOUT = Duration.ofSeconds(10);
 
  55     private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
 
  56     private static final String CONSUMER_GROUP = "group1";
 
  57     private static final String CONSUMER_ID = "consumer200";
 
  58     private static final String DMAAP_SERVICE_NAME = "dmaap";
 
  59     private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
 
  60     private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
 
  61             MR_COMPOSE_RESOURCE_NAME);
 
  62     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
 
  64             "\"mrstatus\":3001," +
 
  65             "\"helpURL\":\"http://onap.readthedocs.io\"," +
 
  66             "\"message\":\"No such topic exists.-[%s]\"," +
 
  71     private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
 
  72             new File(DOCKER_COMPOSE_FILE_PATH))
 
  73             .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
 
  75     private static String EVENTS_PATH;
 
  77     private MessageRouterPublisher publisher = DmaapClientFactory
 
  78             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
 
  79     private MessageRouterSubscriber subscriber = DmaapClientFactory
 
  80             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
  85         EVENTS_PATH = String.format("http://%s:%d/events",
 
  86                 CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
 
  87                 CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
 
  91     void subscriber_shouldHandleNoSuchTopicException() {
 
  93         final String topic = "newTopic";
 
  94         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
 
  95         final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
 
  96         final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
 
  98                 .failReason(expectedFailReason)
 
 102         Mono<MessageRouterSubscribeResponse> response = subscriber
 
 103                 .get(mrSubscribeRequest);
 
 106         StepVerifier.create(response)
 
 107                 .expectNext(expectedResponse)
 
 113     void subscriberShouldHandleSingleItemResponse(){
 
 115         final String topic = "TOPIC";
 
 116         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
 
 117         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
 
 119         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 120         final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse);
 
 121         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
 
 122         final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
 
 124                 .items(expectedItems)
 
 128         registerTopic(publishRequest, subscribeRequest);
 
 129         Mono<MessageRouterSubscribeResponse> response = publisher
 
 130                 .put(publishRequest, jsonMessageBatch)
 
 131                 .then(subscriber.get(subscribeRequest));
 
 134         StepVerifier.create(response)
 
 135                 .expectNext(expectedResponse)
 
 141     void subscriber_shouldHandleMultipleItemsResponse() {
 
 143         final String topic = "TOPIC2";
 
 144         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
 
 145         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
 
 147         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 148                 "{\"differentMessage\":\"message2\"}");
 
 149         final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse);
 
 150         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 151         final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
 
 153                 .items(expectedElements)
 
 157         registerTopic(publishRequest, subscribeRequest);
 
 158         Mono<MessageRouterSubscribeResponse> response = publisher
 
 159                 .put(publishRequest, jsonMessageBatch)
 
 160                 .then(subscriber.get(subscribeRequest));
 
 163         StepVerifier.create(response)
 
 164                 .expectNext(expectedResponse)
 
 170     void subscriber_shouldExtractItemsFromResponse() {
 
 172         final String topic = "TOPIC3";
 
 173         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
 
 174         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
 
 176         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 177                 "{\"differentMessage\":\"message2\"}");
 
 178         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 181         registerTopic(publishRequest, subscribeRequest);
 
 182         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
 
 183                 .thenMany(subscriber.getElements(subscribeRequest));
 
 186         StepVerifier.create(result)
 
 187                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
 
 188                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
 
 194     void subscriber_shouldSubscribeToTopic(){
 
 196         final String topic = "TOPIC4";
 
 197         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
 
 198         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
 
 200         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 201                 "{\"differentMessage\":\"message2\"}");
 
 202         final List<JsonElement> messages = twoJsonMessages.map(parser::parse);
 
 203         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 206         registerTopic(publishRequest, subscribeRequest);
 
 207         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
 
 208                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
 
 211         StepVerifier.create(result.take(2))
 
 212                 .expectNext(messages.get(0))
 
 213                 .expectNext(messages.get(1))
 
 218     private static String getDockerComposeFilePath(String resourceName){
 
 219         URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
 
 220                 .getResource(resourceName);
 
 222         if(resource != null) return resource.getFile();
 
 223         else throw new DockerComposeNotFoundException(String
 
 224                 .format("File %s does not exist", resourceName));
 
 227     private static MessageRouterPublishRequest createMRPublishRequest(String topic){
 
 228         MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
 
 230                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
 
 233         return ImmutableMessageRouterPublishRequest.builder()
 
 234                 .sinkDefinition(sinkDefinition)
 
 238     private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
 
 239         ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
 
 241                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
 
 244         return ImmutableMessageRouterSubscribeRequest
 
 246                 .sourceDefinition(sourceDefinition)
 
 247                 .consumerGroup(CONSUMER_GROUP)
 
 248                 .consumerId(CONSUMER_ID)
 
 252     private void registerTopic(MessageRouterPublishRequest publishRequest,
 
 253             MessageRouterSubscribeRequest subscribeRequest) {
 
 254         final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
 
 255                 "{\"differentMessage\":\"message2\"}");
 
 256         final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages);
 
 258         publisher.put(publishRequest, jsonMessageBatch).blockLast();
 
 259         subscriber.get(subscribeRequest).block();
 
 262     private static Flux<JsonObject> jsonBatch(List<String> messages){
 
 263         return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
 
 266     private JsonObject getAsJsonObject(String item){
 
 267         return new Gson().fromJson(item, JsonObject.class);