Add tests for MR Subscriber 24/87624/4
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>
Tue, 14 May 2019 07:05:49 +0000 (09:05 +0200)
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>
Tue, 14 May 2019 12:50:57 +0000 (14:50 +0200)
Change-Id: I705bcd5c0a75d9ac637afdb8cd6ac17b0af55c78
Issue-ID: DCAEGEN2-1422
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java

index ab51bfe..a2c000f 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
 
+import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import java.time.Duration;
 import org.junit.jupiter.api.BeforeAll;
@@ -30,9 +32,13 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouter
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.server.HttpServerRoutes;
 import reactor.test.StepVerifier;
 
 /**
@@ -40,36 +46,230 @@ import reactor.test.StepVerifier;
  * @since May 2019
  */
 class MessageRouterSubscriberIT {
-    private MessageRouterSubscriber sut = DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-    private static DummyHttpServer server;
+    private static final Duration TIMEOUT = Duration.ofSeconds(10);
+    private static final String ERROR_MESSAGE = "Something went wrong";
+    private static final String CONSUMER_GROUP = "group1";
+    private static final String SUCCESS_CONSUMER_ID = "consumer200";
+    private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
+    private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
+    private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
+    private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
+    private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
+
+    private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
+
+    private static final String SUCCESS_RESP_PATH = String
+            .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
+    private static final String FAILING_WITH_401_RESP_PATH = String
+            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
+    private static final String FAILING_WITH_403_RESP_PATH = String
+            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_403_CONSUMER_ID);
+    private static final String FAILING_WITH_409_RESP_PATH = String
+            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_409_CONSUMER_ID);
+    private static final String FAILING_WITH_429_RESP_PATH = String
+            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_429_CONSUMER_ID);
+    private static final String FAILING_WITH_500_RESP_PATH = String
+            .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
+
+    private static MessageRouterSubscribeRequest mrSuccessRequest;
+    private static MessageRouterSubscribeRequest mrFailingRequest;
+    private MessageRouterSubscriber sut = DmaapClientFactory
+            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
     private static MessageRouterSource sourceDefinition;
 
+
     @BeforeAll
     static void setUp() {
-        server = DummyHttpServer.start(routes ->
-                routes.get("/events/TOPIC/group1/consumer8", (req, resp) -> sendResource(resp, "/sample-mr-subscribe-response.json"))
-        );
-        sourceDefinition = ImmutableMessageRouterSource.builder()
-                .name("the topic")
-                .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
-                .build();
+        DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberIT::setRoutes);
+
+        sourceDefinition = createMessageRouterSource(server);
+
+        mrSuccessRequest = createSuccessRequest();
+
+        mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
     }
 
     @Test
-    void testStub() {
-        final MessageRouterSubscribeRequest mrRequest = ImmutableMessageRouterSubscribeRequest.builder()
-                .sourceDefinition(sourceDefinition)
-                .consumerGroup("group1")
-                .consumerId("consumer8")
+    void subscriber_shouldGetCorrectResponse(){
+        Mono<MessageRouterSubscribeResponse> response = sut
+                .get(mrSuccessRequest);
+
+        JsonArray expectedItems = new JsonArray();
+        expectedItems.add("I");
+        expectedItems.add("like");
+        expectedItems.add("pizza");
+
+        MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+                .builder()
+                .items(expectedItems)
                 .build();
 
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldGetUnauthorizedErrorResponse(){
+        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
+        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+                .format("401 Unauthorized\n%s", ERROR_MESSAGE));
+
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldGetForbiddenErrorResponse(){
+        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
+        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+                .format("403 Forbidden\n%s", ERROR_MESSAGE));
+
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldGetConflictErrorResponse(){
+        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
+        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+                .format("409 Conflict\n%s", ERROR_MESSAGE));
+
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldGetTooManyRequestsErrorResponse(){
+        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
+        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+                .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
+
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldGetInternalServerErrorResponse(){
+        Mono<MessageRouterSubscribeResponse> response = sut
+                .get(mrFailingRequest);
+
+        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+                .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
+
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldParseCorrectResponse() {
         final Flux<String> result = sut
-                .getElements(mrRequest)
+                .getElements(mrSuccessRequest)
                 .map(JsonElement::getAsString);
 
         StepVerifier.create(result)
                 .expectNext("I", "like", "pizza")
                 .expectComplete()
-                .verify(Duration.ofSeconds(10));
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldParseErrorResponse(){
+        Flux<String> result = sut
+                .getElements(mrFailingRequest)
+                .map(JsonElement::getAsString);
+
+        StepVerifier.create(result)
+                .expectError(IllegalStateException.class)
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldSubscribeCorrectly(){
+        Flux<String> subscriptionForElements = sut
+                .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
+                .map(JsonElement:: getAsString);
+
+        StepVerifier.create(subscriptionForElements.take(2))
+                .expectNext("I", "like")
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldParseErrorWhenSubscribed(){
+        Flux<String> subscriptionForElements = sut
+                .subscribeForElements(mrFailingRequest, Duration.ofSeconds(1))
+                .map(JsonElement:: getAsString);
+
+        StepVerifier.create(subscriptionForElements.take(2))
+                .expectError(IllegalStateException.class)
+                .verify(TIMEOUT);
+    }
+
+    private static HttpServerRoutes setRoutes(HttpServerRoutes routes){
+        return routes
+                .get(SUCCESS_RESP_PATH, (req, resp) ->
+                        sendResource(resp, "/sample-mr-subscribe-response.json"))
+                .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
+                        sendError(resp, 401, ERROR_MESSAGE))
+                .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
+                        sendError(resp, 403, ERROR_MESSAGE))
+                .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
+                        sendError(resp, 409, ERROR_MESSAGE))
+                .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
+                        sendError(resp, 429, ERROR_MESSAGE))
+                .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
+                        sendError(resp, 500, ERROR_MESSAGE));
+    }
+
+    private static MessageRouterSource createMessageRouterSource(DummyHttpServer server){
+       return ImmutableMessageRouterSource.builder()
+                .name("the topic")
+                .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
+                .build();
+    }
+
+    private static MessageRouterSubscribeRequest createSuccessRequest(){
+        return ImmutableMessageRouterSubscribeRequest.builder()
+                .sourceDefinition(sourceDefinition)
+                .consumerGroup(CONSUMER_GROUP)
+                .consumerId(SUCCESS_CONSUMER_ID)
+                .build();
+    }
+
+    private static MessageRouterSubscribeRequest createFailingRequest(String consumerId){
+        return ImmutableMessageRouterSubscribeRequest
+                .builder()
+                .sourceDefinition(sourceDefinition)
+                .consumerGroup(CONSUMER_GROUP)
+                .consumerId(consumerId)
+                .build();
+    }
+
+    private static MessageRouterSubscribeResponse createErrorResponse(String failReason){
+        return ImmutableMessageRouterSubscribeResponse
+                .builder()
+                .failReason(failReason)
+                .build();
     }
 }
index 2a1ba7a..4795b00 100644 (file)
@@ -67,6 +67,10 @@ public class DummyHttpServer {
         return sendString(httpServerResponse, Mono.fromCallable(() -> readResource(resourcePath)));
     }
 
+    public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message){
+        return sendString(httpServerResponse.status(statusCode), Mono.just(message));
+    }
+
     public static Publisher<Void> sendString(HttpServerResponse httpServerResponse, Publisher<String> content) {
         return httpServerResponse.sendString(content);
     }