Switch from dummy server to testcontainers in DMaapClient integration tests 84/89784/5
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>
Thu, 13 Jun 2019 07:46:31 +0000 (09:46 +0200)
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>
Fri, 14 Jun 2019 11:19:58 +0000 (13:19 +0200)
Change-Id: I1f75a0e1f85a91401d99e6f57fa648259d057b81
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1616

rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java [new file with mode: 0644]
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java [deleted file]
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java [new file with mode: 0644]

diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
new file mode 100644 (file)
index 0000000..7e6b0d4
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import java.io.File;
+import java.net.URL;
+import org.testcontainers.containers.DockerComposeContainer;
+
+final class DMaapContainer {
+    private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
+    private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
+            MR_COMPOSE_RESOURCE_NAME);
+    static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
+    static final String DMAAP_SERVICE_NAME = "dmaap";
+
+    private DMaapContainer() {}
+
+    static DockerComposeContainer createContainerInstance(){
+        return new DockerComposeContainer(
+                new File(DOCKER_COMPOSE_FILE_PATH))
+                .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
+    }
+
+    private static String getDockerComposeFilePath(String resourceName){
+        URL resource = DMaapContainer.class.getClassLoader()
+                .getResource(resourceName);
+
+        if(resource != null) return resource.getFile();
+        else throw new DockerComposeNotFoundException(String
+                .format("File %s does not exist", resourceName));
+    }
+}
index 9fbd63c..c746bfe 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
 
 import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
 import java.time.Duration;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since May 2019
- */
+@Testcontainers
 class MessageRouterPublisherIT {
-
-    private static final String ERROR_MESSAGE = "Something went wrong";
-    private static final String TEXT_PLAIN_CONTENT_TYPE = "text/plain";
-    private static final String JSON_CONTENT_TYPE = "application/json";
-    private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
-    private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
-    private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401";
-    private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
-    private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
-    private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500";
+    @Container
+    private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
-    private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
-            .map(JsonPrimitive::new);
-    private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
-
-    private static DummyHttpServer server;
-    private MessageRouterPublisher sut = DmaapClientFactory
+    private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
+            + "{"
+            + "\"mrstatus\":5007,"
+            + "\"helpURL\":\"http://onap.readthedocs.io\","
+            + "\"message\":\"Error while publishing data to topic.:%s."
+            + "Successfully published number of messages :0."
+            + "Expected { to start an object.\",\"status\":400"
+            + "}";
+    private static String EVENTS_PATH;
+    private final MessageRouterPublisher publisher = DmaapClientFactory
             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
+    private MessageRouterSubscriber subscriber = DmaapClientFactory
+            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
     @BeforeAll
     static void setUp() {
-        server = DummyHttpServer.start(routes ->
-                routes.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
-                        .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
-                                sendError(resp, 400, ERROR_MESSAGE))
-                        .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
-                                sendError(resp, 401, ERROR_MESSAGE))
-                        .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
-                                sendError(resp, 403, ERROR_MESSAGE))
-                        .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
-                                sendError(resp, 404, ERROR_MESSAGE))
-                        .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
-                                sendError(resp, 500, ERROR_MESSAGE))
-        );
+        EVENTS_PATH = String.format("http://%s:%d/events",
+                CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
+                        DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
+                CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
+                        DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
     }
 
     @Test
     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
         //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH,
-                TEXT_PLAIN_CONTENT_TYPE);
-        final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
-
+        final String topic = "TOPIC";
+        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
+        final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
+        final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
 
         //when
-        final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
-
-        //then
-        StepVerifier.create(result)
-                .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void publisher_shouldHandleBadRequestError(){
-        //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH,
-                JSON_CONTENT_TYPE);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "400 Bad Request\n%s", ERROR_MESSAGE);
-
-        //when
-        final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+        final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
 
         //then
         StepVerifier.create(result)
@@ -120,34 +91,17 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldHandleUnauthorizedError(){
-        //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH,
-                TEXT_PLAIN_CONTENT_TYPE);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "401 Unauthorized\n%s", ERROR_MESSAGE);
-
-        //when
-        final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
-
-        //then
-        StepVerifier.create(result)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void publisher_shouldHandleForbiddenError(){
+    void publisher_shouldHandleBadRequestError(){
         //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH,
-                TEXT_PLAIN_CONTENT_TYPE);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "403 Forbidden\n%s", ERROR_MESSAGE);
+        final String topic = "TOPIC2";
+        final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
+        final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages);
+        final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
+        final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
+                DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
 
         //when
-        final Flux<MessageRouterPublishResponse> result = sut
-                .put(mrRequest, messageBatch);
+        final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
 
         //then
         StepVerifier.create(result)
@@ -157,64 +111,51 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldHandleNotFoundError(){
-        //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH,
-                TEXT_PLAIN_CONTENT_TYPE);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "404 Not Found\n%s", ERROR_MESSAGE);
+    void publisher_shouldSuccessfullyPublishSingleMessage(){
+        final String topic = "TOPIC3";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
         //when
-        final Flux<MessageRouterPublishResponse> result = sut
-                .put(mrRequest, messageBatch);
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
 
         //then
-        StepVerifier.create(result)
+        StepVerifier.create(response)
                 .expectNext(expectedResponse)
                 .expectComplete()
-                .verify(TIMEOUT);
+                .verify();
     }
 
     @Test
-    void publisher_shouldHandleInternalServerError(){
-        //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH,
-                TEXT_PLAIN_CONTENT_TYPE);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "500 Internal Server Error\n%s", ERROR_MESSAGE);
+    void publisher_shouldSuccessfullyPublishMultipleMessages(){
+        final String topic = "TOPIC4";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
         //when
-        final Flux<MessageRouterPublishResponse> result = sut
-                .put(mrRequest, messageBatch);
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
 
         //then
-        StepVerifier.create(result)
+        StepVerifier.create(response)
                 .expectNext(expectedResponse)
                 .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-
-    private MessageRouterPublishRequest createMRRequest(String topicPath, String contentType){
-        final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
-                .name("the topic")
-                .topicUrl(String.format("http://%s:%d%s",
-                        server.host(),
-                        server.port(),
-                        topicPath)
-                )
-                .build();
-
-        return ImmutableMessageRouterPublishRequest.builder()
-                .sinkDefinition(sinkDefinition)
-                .contentType(contentType)
-                .build();
-    }
-
-    private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs){
-        return ImmutableMessageRouterPublishResponse
-                .builder()
-                .failReason(String.format(failReasonFormat, formatArgs))
-                .build();
+                .verify();
     }
 }
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
deleted file mode 100644 (file)
index 32b77a1..0000000
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.vavr.collection.List;
-import java.io.File;
-import java.net.URL;
-import java.time.Duration;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
-import org.testcontainers.containers.DockerComposeContainer;
-import org.testcontainers.junit.jupiter.Container;
-import org.testcontainers.junit.jupiter.Testcontainers;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-@Testcontainers
-class MessageRouterSubscriberCIT {
-    private static final JsonParser parser = new JsonParser();
-    private static final Duration TIMEOUT = Duration.ofSeconds(10);
-    private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
-    private static final String CONSUMER_GROUP = "group1";
-    private static final String CONSUMER_ID = "consumer200";
-    private static final String DMAAP_SERVICE_NAME = "dmaap";
-    private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
-    private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
-            MR_COMPOSE_RESOURCE_NAME);
-    private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
-            "{" +
-            "\"mrstatus\":3001," +
-            "\"helpURL\":\"http://onap.readthedocs.io\"," +
-            "\"message\":\"No such topic exists.-[%s]\"," +
-            "\"status\":404" +
-            "}";
-
-    @Container
-    private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
-            new File(DOCKER_COMPOSE_FILE_PATH))
-            .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
-
-    private static String EVENTS_PATH;
-
-    private MessageRouterPublisher publisher = DmaapClientFactory
-            .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-    private MessageRouterSubscriber subscriber = DmaapClientFactory
-            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
-
-    @BeforeAll
-    static void setUp() {
-        EVENTS_PATH = String.format("http://%s:%d/events",
-                CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
-                CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
-    }
-
-    @Test
-    void subscriber_shouldHandleNoSuchTopicException() {
-        //given
-        final String topic = "newTopic";
-        final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
-        final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
-        final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
-                .builder()
-                .failReason(expectedFailReason)
-                .build();
-
-        //when
-        Mono<MessageRouterSubscribeResponse> response = subscriber
-                .get(mrSubscribeRequest);
-
-        //then
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriberShouldHandleSingleItemResponse(){
-        //given
-        final String topic = "TOPIC";
-        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
-        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
-        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
-        final List<JsonElement> expectedItems = singleJsonMessage.map(parser::parse);
-        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
-        final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
-                .builder()
-                .items(expectedItems)
-                .build();
-
-        //when
-        registerTopic(publishRequest, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, jsonMessageBatch)
-                .then(subscriber.get(subscribeRequest));
-
-        //then
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify();
-    }
-
-    @Test
-    void subscriber_shouldHandleMultipleItemsResponse() {
-        //given
-        final String topic = "TOPIC2";
-        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
-        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
-        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
-                "{\"differentMessage\":\"message2\"}");
-        final List<JsonElement> expectedElements = twoJsonMessages.map(parser::parse);
-        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
-        final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
-                .builder()
-                .items(expectedElements)
-                .build();
-
-        //when
-        registerTopic(publishRequest, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, jsonMessageBatch)
-                .then(subscriber.get(subscribeRequest));
-
-        //then
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify();
-    }
-
-    @Test
-    void subscriber_shouldExtractItemsFromResponse() {
-        //given
-        final String topic = "TOPIC3";
-        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
-        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
-        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
-                "{\"differentMessage\":\"message2\"}");
-        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
-
-        //when
-        registerTopic(publishRequest, subscribeRequest);
-        final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
-                .thenMany(subscriber.getElements(subscribeRequest));
-
-        //then
-        StepVerifier.create(result)
-                .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
-                .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriber_shouldSubscribeToTopic(){
-        //given
-        final String topic = "TOPIC4";
-        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
-        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-
-        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
-                "{\"differentMessage\":\"message2\"}");
-        final List<JsonElement> messages = twoJsonMessages.map(parser::parse);
-        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
-
-        //when
-        registerTopic(publishRequest, subscribeRequest);
-        final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
-                .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
-
-        //then
-        StepVerifier.create(result.take(2))
-                .expectNext(messages.get(0))
-                .expectNext(messages.get(1))
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    private static String getDockerComposeFilePath(String resourceName){
-        URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
-                .getResource(resourceName);
-
-        if(resource != null) return resource.getFile();
-        else throw new DockerComposeNotFoundException(String
-                .format("File %s does not exist", resourceName));
-    }
-
-    private static MessageRouterPublishRequest createMRPublishRequest(String topic){
-        MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
-                .name("the topic")
-                .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
-                .build();
-
-        return ImmutableMessageRouterPublishRequest.builder()
-                .sinkDefinition(sinkDefinition)
-                .build();
-    }
-
-    private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
-        ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
-                .name("the topic")
-                .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
-                .build();
-
-        return ImmutableMessageRouterSubscribeRequest
-                .builder()
-                .sourceDefinition(sourceDefinition)
-                .consumerGroup(CONSUMER_GROUP)
-                .consumerId(CONSUMER_ID)
-                .build();
-    }
-
-    private void registerTopic(MessageRouterPublishRequest publishRequest,
-            MessageRouterSubscribeRequest subscribeRequest) {
-        final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
-                "{\"differentMessage\":\"message2\"}");
-        final Flux<JsonObject> jsonMessageBatch = jsonBatch(sampleJsonMessages);
-
-        publisher.put(publishRequest, jsonMessageBatch).blockLast();
-        subscriber.get(subscribeRequest).block();
-    }
-
-    private static Flux<JsonObject> jsonBatch(List<String> messages){
-        return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
-    }
-
-    private JsonObject getAsJsonObject(String item){
-        return new Gson().fromJson(item, JsonObject.class);
-    }
-}
index 225d353..c2e96b5 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
 
 import com.google.gson.JsonElement;
-import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonObject;
 import io.vavr.collection.List;
 import java.time.Duration;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.netty.http.server.HttpServerRoutes;
 import reactor.test.StepVerifier;
 
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since May 2019
- */
+@Testcontainers
 class MessageRouterSubscriberIT {
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
-    private static final String ERROR_MESSAGE = "Something went wrong";
     private static final String CONSUMER_GROUP = "group1";
-    private static final String SUCCESS_CONSUMER_ID = "consumer200";
-    private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
-    private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
-    private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
-    private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
-    private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
-
-    private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
-
-    private static final String SUCCESS_RESP_PATH = String
-            .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
-    private static final String FAILING_WITH_401_RESP_PATH = String
-            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
-    private static final String FAILING_WITH_403_RESP_PATH = String
-            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_403_CONSUMER_ID);
-    private static final String FAILING_WITH_409_RESP_PATH = String
-            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_409_CONSUMER_ID);
-    private static final String FAILING_WITH_429_RESP_PATH = String
-            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_429_CONSUMER_ID);
-    private static final String FAILING_WITH_500_RESP_PATH = String
-            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
-
-    private static MessageRouterSubscribeRequest mrSuccessRequest;
-    private static MessageRouterSubscribeRequest mrFailingRequest;
-    private MessageRouterSubscriber sut = DmaapClientFactory
+    private static final String CONSUMER_ID = "consumer200";
+    private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
+            "{" +
+            "\"mrstatus\":3001," +
+            "\"helpURL\":\"http://onap.readthedocs.io\"," +
+            "\"message\":\"No such topic exists.-[%s]\"," +
+            "\"status\":404" +
+            "}";
+
+    @Container
+    private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
+
+    private static String EVENTS_PATH;
+
+    private MessageRouterPublisher publisher = DmaapClientFactory
+            .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+    private MessageRouterSubscriber subscriber = DmaapClientFactory
             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-    private static MessageRouterSource sourceDefinition;
 
 
     @BeforeAll
     static void setUp() {
-        DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberIT::setRoutes);
-
-        sourceDefinition = createMessageRouterSource(server);
-
-        mrSuccessRequest = createSuccessRequest();
-
-        mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
-    }
-
-    @Test
-    void subscriber_shouldGetCorrectResponse(){
-        Mono<MessageRouterSubscribeResponse> response = sut
-                .get(mrSuccessRequest);
-
-        List<String> expectedItems = List.of("I", "like", "pizza");
-
-        MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
-                .builder()
-                .items(expectedItems.map(JsonPrimitive::new))
-                .build();
-
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
+        EVENTS_PATH = String.format("http://%s:%d/events",
+                CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
+                        DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
+                CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
+                        DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
     }
 
     @Test
-    void subscriber_shouldGetUnauthorizedErrorResponse(){
-        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
-        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("401 Unauthorized\n%s", ERROR_MESSAGE));
-
+    void subscriber_shouldHandleNoSuchTopicException() {
+        //given
+        final String topic = "newTopic";
+        final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
+                String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
+        final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+                DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
+
+        //when
+        Mono<MessageRouterSubscribeResponse> response = subscriber
+                .get(mrSubscribeRequest);
+
+        //then
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
                 .expectComplete()
@@ -123,151 +94,111 @@ class MessageRouterSubscriberIT {
     }
 
     @Test
-    void subscriber_shouldGetForbiddenErrorResponse(){
-        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
-        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("403 Forbidden\n%s", ERROR_MESSAGE));
-
+    void subscriberShouldHandleSingleItemResponse(){
+        //given
+        final String topic = "TOPIC";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        //when
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
                 .expectComplete()
-                .verify(TIMEOUT);
+                .verify();
     }
 
     @Test
-    void subscriber_shouldGetConflictErrorResponse(){
-        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
-        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("409 Conflict\n%s", ERROR_MESSAGE));
-
+    void subscriber_shouldHandleMultipleItemsResponse() {
+        //given
+        final String topic = "TOPIC2";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
+
+        //when
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
                 .expectComplete()
-                .verify(TIMEOUT);
+                .verify();
     }
 
     @Test
-    void subscriber_shouldGetTooManyRequestsErrorResponse(){
-        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
-        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
-
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriber_shouldGetInternalServerErrorResponse(){
-        Mono<MessageRouterSubscribeResponse> response = sut
-                .get(mrFailingRequest);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
-
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriber_shouldParseCorrectResponse() {
-        final Flux<String> result = sut
-                .getElements(mrSuccessRequest)
-                .map(JsonElement::getAsString);
-
+    void subscriber_shouldExtractItemsFromResponse() {
+        //given
+        final String topic = "TOPIC3";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
+                CONSUMER_GROUP, CONSUMER_ID);
+
+        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+        //when
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+                .thenMany(subscriber.getElements(subscribeRequest));
+
+        //then
         StepVerifier.create(result)
-                .expectNext("I", "like", "pizza")
+                .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
+                .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
                 .expectComplete()
                 .verify(TIMEOUT);
     }
 
     @Test
-    void subscriber_shouldParseErrorResponse(){
-        Flux<String> result = sut
-                .getElements(mrFailingRequest)
-                .map(JsonElement::getAsString);
-
-        StepVerifier.create(result)
-                .expectError(IllegalStateException.class)
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriber_shouldSubscribeCorrectly(){
-        Flux<String> subscriptionForElements = sut
-                .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
-                .map(JsonElement:: getAsString);
-
-        StepVerifier.create(subscriptionForElements.take(2))
-                .expectNext("I", "like")
+    void subscriber_shouldSubscribeToTopic(){
+        //given
+        final String topic = "TOPIC4";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
+                CONSUMER_GROUP, CONSUMER_ID);
+
+        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+        //when
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
+                .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
+
+        //then
+        StepVerifier.create(result.take(2))
+                .expectNext(messages.get(0))
+                .expectNext(messages.get(1))
                 .expectComplete()
                 .verify(TIMEOUT);
     }
 
-    @Test
-    void subscriber_shouldParseErrorWhenSubscribed(){
-        Flux<String> subscriptionForElements = sut
-                .subscribeForElements(mrFailingRequest, Duration.ofSeconds(1))
-                .map(JsonElement:: getAsString);
-
-        StepVerifier.create(subscriptionForElements.take(2))
-                .expectError(IllegalStateException.class)
-                .verify(TIMEOUT);
-    }
 
-    private static HttpServerRoutes setRoutes(HttpServerRoutes routes){
-        return routes
-                .get(SUCCESS_RESP_PATH, (req, resp) ->
-                        sendResource(resp, "/sample-mr-subscribe-response.json"))
-                .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
-                        sendError(resp, 401, ERROR_MESSAGE))
-                .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
-                        sendError(resp, 403, ERROR_MESSAGE))
-                .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
-                        sendError(resp, 409, ERROR_MESSAGE))
-                .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
-                        sendError(resp, 429, ERROR_MESSAGE))
-                .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
-                        sendError(resp, 500, ERROR_MESSAGE));
-    }
 
-    private static MessageRouterSource createMessageRouterSource(DummyHttpServer server){
-       return ImmutableMessageRouterSource.builder()
-                .name("the topic")
-                .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
-                .build();
-    }
-
-    private static MessageRouterSubscribeRequest createSuccessRequest(){
-        return ImmutableMessageRouterSubscribeRequest.builder()
-                .sourceDefinition(sourceDefinition)
-                .consumerGroup(CONSUMER_GROUP)
-                .consumerId(SUCCESS_CONSUMER_ID)
-                .build();
-    }
-
-    private static MessageRouterSubscribeRequest createFailingRequest(String consumerId){
-        return ImmutableMessageRouterSubscribeRequest
-                .builder()
-                .sourceDefinition(sourceDefinition)
-                .consumerGroup(CONSUMER_GROUP)
-                .consumerId(consumerId)
-                .build();
-    }
-
-    private static MessageRouterSubscribeResponse createErrorResponse(String failReason){
-        return ImmutableMessageRouterSubscribeResponse
-                .builder()
-                .failReason(failReason)
-                .build();
-    }
 }
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java
new file mode 100644 (file)
index 0000000..8695b72
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import io.vavr.collection.List;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import reactor.core.publisher.Flux;
+
+final class MessageRouterTestsUtils {
+    private static final JsonParser parser = new JsonParser();
+    private MessageRouterTestsUtils() {}
+
+    static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+        MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+                .name("the topic")
+                .topicUrl(topicUrl)
+                .build();
+
+        return ImmutableMessageRouterPublishRequest.builder()
+                .sinkDefinition(sinkDefinition)
+                .build();
+    }
+
+    static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
+            String consumerGroup, String consumerId) {
+        ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
+                .name("the topic")
+                .topicUrl(topicUrl)
+                .build();
+
+        return ImmutableMessageRouterSubscribeRequest
+                .builder()
+                .sourceDefinition(sourceDefinition)
+                .consumerGroup(consumerGroup)
+                .consumerId(consumerId)
+                .build();
+    }
+
+    static List<JsonElement> getAsJsonElements(List<String> messages){
+        return messages.map(parser::parse);
+    }
+
+    static JsonObject getAsJsonObject(String item){
+        return new Gson().fromJson(item, JsonObject.class);
+    }
+
+    static Flux<JsonObject> jsonBatch(List<String> messages){
+        return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+    }
+
+    static Flux<JsonPrimitive> plainBatch(List<String> messages){
+        return Flux.fromIterable(messages).map(JsonPrimitive::new);
+    }
+
+    static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
+        return ImmutableMessageRouterSubscribeResponse
+                .builder()
+                .failReason(String.format(failReasonFormat, formatArgs))
+                .build();
+    }
+
+    static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
+        return ImmutableMessageRouterSubscribeResponse
+                .builder()
+                .items(items)
+                .build();
+    }
+
+    static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
+        return ImmutableMessageRouterPublishResponse
+                .builder()
+                .failReason(String.format(failReasonFormat, formatArgs))
+                .build();
+    }
+
+    static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
+        return ImmutableMessageRouterPublishResponse
+                .builder()
+                .items(items)
+                .build();
+    }
+
+    static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
+            MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
+        final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final Flux<JsonObject> jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages);
+
+        publisher.put(publishRequest, jsonMessageBatch).blockLast();
+        subscriber.get(subscribeRequest).block();
+    }
+}