Made DmaapMessageConsumer asynchronuous 88/113888/6
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 14 Oct 2020 12:14:06 +0000 (14:14 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 19 Oct 2020 06:34:58 +0000 (08:34 +0200)
Change-Id: Ib3d4951f3f9b2061353b5e50f427559a3781b10e
Issue-ID: CCSDK-2502
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java

index 8409f45..7f453a2 100644 (file)
@@ -188,7 +188,7 @@ public class AsyncRestClient {
             logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
                     exception.getResponseBodyAsString());
         } else {
-            logger.debug("{} HTTP error", traceTag, t);
+            logger.debug("{} HTTP error {}", traceTag, t.getMessage());
         }
     }
 
index 3a36517..f948e5f 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
 
-import com.google.common.collect.Iterables;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
@@ -37,15 +36,17 @@ import java.util.ServiceLoader;
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
-import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
 /**
  * The class fetches incoming requests from DMAAP. It uses the timeout parameter
  * that lets the MessageRouter keep the connection with the Kafka open until
@@ -74,60 +75,82 @@ public class DmaapMessageConsumer {
 
     private final AsyncRestClientFactory restClientFactory;
 
+    private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
+
     @Value("${server.http-port}")
     private int localServerHttpPort;
 
+    private static class InfiniteFlux {
+        private FluxSink<Integer> sink;
+        private int counter = 0;
+
+        public synchronized Flux<Integer> start() {
+            stop();
+            return Flux.create(this::next).doOnRequest(this::onRequest);
+        }
+
+        public synchronized void stop() {
+            if (this.sink != null) {
+                this.sink.complete();
+                this.sink = null;
+            }
+        }
+
+        void onRequest(long no) {
+            logger.debug("InfiniteFlux.onRequest {}", no);
+            for (long i = 0; i < no; ++i) {
+                sink.next(counter++);
+            }
+        }
+
+        void next(FluxSink<Integer> sink) {
+            logger.debug("InfiniteFlux.next");
+            this.sink = sink;
+            sink.next(counter++);
+        }
+
+    }
+
     @Autowired
     public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
         this.applicationConfig = applicationConfig;
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
-        gson = gsonBuilder.create();
+        this.gson = gsonBuilder.create();
         this.restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
     }
 
     /**
-     * Starts the consumer. If there is a DMaaP configuration, it will start polling
-     * for messages. Otherwise it will check regularly for the configuration.
+     * Starts the DMAAP consumer. If there is a DMaaP configuration, it will start
+     * polling for messages. Otherwise it will check regularly for the
+     * configuration.
      *
-     * @return the running thread, for test purposes.
      */
-    public Thread start() {
-        Thread thread = new Thread(this::messageHandlingLoop);
-        thread.start();
-        return thread;
+    public void start() {
+        infiniteSubmitter.stop();
+
+        createTask().subscribe(//
+                value -> logger.debug("DmaapMessageConsumer next: {}", value), //
+                throwable -> logger.error("DmaapMessageConsumer error: {}", throwable), //
+                () -> logger.warn("DmaapMessageConsumer stopped") //
+        );
     }
 
-    private void messageHandlingLoop() {
-        while (!isStopped()) {
-            try {
-                if (isDmaapConfigured()) {
-                    Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages();
-                    if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
-                        logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
-                        for (DmaapRequestMessage msg : dmaapMsgs) {
-                            processMsg(msg);
-                        }
-                    }
-                } else {
-                    sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
-                }
-            } catch (Exception e) {
-                logger.warn("{}", e.getMessage());
-                sleep(TIME_BETWEEN_DMAAP_RETRIES);
-            }
-        }
+    protected Flux<String> createTask() {
+        return infiniteFlux() //
+                .flatMap(notUsed -> fetchFromDmaap(), 1) //
+                .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
+                .flatMap(this::parseReceivedMessage, 1)//
+                .flatMap(this::handleDmaapMsg, 1) //
+                .onErrorResume(throwable -> Mono.empty());
     }
 
-    protected boolean isStopped() {
-        return false;
+    protected Flux<Integer> infiniteFlux() {
+        return infiniteSubmitter.start();
     }
 
-    protected boolean isDmaapConfigured() {
-        String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
-        String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
-        return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty()
-                && !consumerTopicUrl.isEmpty());
+    protected Mono<Object> delay() {
+        return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(o -> Mono.empty());
     }
 
     private <T> List<T> parseList(String jsonString, Class<T> clazz) {
@@ -146,7 +169,36 @@ public class DmaapMessageConsumer {
         return result;
     }
 
-    private void sendErrorResponse(String response) {
+    protected boolean isDmaapConfigured() {
+        String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+        String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+        return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty()
+                && !consumerTopicUrl.isEmpty());
+    }
+
+    protected Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) {
+        return getDmaapMessageHandler().handleDmaapMsg(dmaapRequestMessage);
+    }
+
+    protected Mono<String> getFromMessageRouter(String topicUrl) {
+        logger.trace("getFromMessageRouter {}", topicUrl);
+        AsyncRestClient c = restClientFactory.createRestClient("");
+        return c.get(topicUrl);
+    }
+
+    protected Flux<DmaapRequestMessage> parseReceivedMessage(String jsonString) {
+        try {
+            logger.trace("parseMessages {}", jsonString);
+            return Flux.fromIterable(parseList(jsonString, DmaapRequestMessage.class));
+        } catch (Exception e) {
+            logger.error("parseMessages error {}", jsonString);
+            return sendErrorResponse("Could not parse: " + jsonString) //
+                    .flatMapMany(s -> Flux.empty());
+        }
+    }
+
+    protected Mono<String> sendErrorResponse(String response) {
+        logger.debug("sendErrorResponse {}", response);
         DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() //
                 .apiVersion("") //
                 .correlationId("") //
@@ -158,37 +210,23 @@ public class DmaapMessageConsumer {
                 .timestamp("") //
                 .url("URL") //
                 .build();
-        getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block();
+        return getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST) //
+                .onErrorResume(e -> Mono.empty());
     }
 
-    List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException {
-        try {
-            return parseList(jsonString, DmaapRequestMessage.class);
-        } catch (Exception e) {
-            sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString);
-            throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage());
+    private Mono<String> fetchFromDmaap() {
+        if (!this.isDmaapConfigured()) {
+            logger.debug("fetchFromDmaap, no action DMAAP not configured");
+            return delay().flatMap(o -> Mono.empty());
         }
-    }
-
-    protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException {
+        logger.debug("fetchFromDmaap");
         String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
-        AsyncRestClient consumer = getMessageRouterConsumer();
-        ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
-        logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody());
-        if (response.getStatusCode().is2xxSuccessful()) {
-            return parseMessages(response.getBody());
-        } else {
-            throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString()
-                    + " " + response.getBody());
-        }
-    }
 
-    private void processMsg(DmaapRequestMessage msg) {
-        logger.debug("Message Reveived from DMAAP : {}", msg);
-        getDmaapMessageHandler().handleDmaapMsg(msg);
+        return getFromMessageRouter(topicUrl) //
+                .onErrorResume(throwable -> delay().flatMap(o -> Mono.empty()));
     }
 
-    protected DmaapMessageHandler getDmaapMessageHandler() {
+    private DmaapMessageHandler getDmaapMessageHandler() {
         if (this.dmaapMessageHandler == null) {
             String pmsBaseUrl = "http://localhost:" + this.localServerHttpPort;
             AsyncRestClient pmsClient = restClientFactory.createRestClient(pmsBaseUrl);
@@ -199,16 +237,4 @@ public class DmaapMessageConsumer {
         return this.dmaapMessageHandler;
     }
 
-    protected void sleep(Duration duration) {
-        try {
-            Thread.sleep(duration.toMillis());
-        } catch (Exception e) {
-            logger.error("Failed to put the thread to sleep", e);
-        }
-    }
-
-    protected AsyncRestClient getMessageRouterConsumer() {
-        return restClientFactory.createRestClient("");
-    }
-
 }
index 967cab1..c77087a 100644 (file)
@@ -54,20 +54,13 @@ public class DmaapMessageHandler {
         this.dmaapClient = dmaapClient;
     }
 
-    public void handleDmaapMsg(DmaapRequestMessage msg) {
-        try {
-            String result = this.createTask(msg).block();
-            logger.debug("handleDmaapMsg: {}", result);
-        } catch (Exception throwable) {
-            logger.warn("handleDmaapMsg failure {}", throwable.getMessage());
-        }
-    }
-
-    Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) {
+    public Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) {
         return this.invokePolicyManagementService(dmaapRequestMessage) //
                 .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
                 .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage,
-                        response.getStatusCode()));
+                        response.getStatusCode()))
+                .doOnError(t -> logger.warn("Failed to handle DMAAP message : {}", t.getMessage()))//
+                .onErrorResume(t -> Mono.empty());
     }
 
     private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error,
index b6d3cc0..72ca84a 100644 (file)
 
 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
 
-import static ch.qos.logback.classic.Level.WARN;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonObject;
 
-import java.time.Duration;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.ArrayList;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
@@ -57,8 +47,8 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
+
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
@@ -79,151 +69,113 @@ class DmaapMessageConsumerTest {
         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
     }
 
-    @Test
-    void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-
-        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
-        doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
-        doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
-
-        messageConsumerUnderTest.start().join();
-
-        InOrder orderVerifier = inOrder(messageConsumerUnderTest);
-        orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
-        orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+    private void setTaskNumberOfLoops(int number) {
+        ArrayList<Integer> l = new ArrayList<>();
+        for (int i = 0; i < number; ++i) {
+            l.add(i);
+        }
+        Flux<Integer> f = Flux.fromIterable(l);
+        doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
     }
 
-    @Test
-    void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-
-        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
-        doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
-        doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
-
-        messageConsumerUnderTest.start().join();
-
-        InOrder orderVerifier = inOrder(messageConsumerUnderTest);
-        orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
-        orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+    private void disableTaskDelay() {
+        doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
     }
 
     @Test
-    void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
-        setUpMrConfig();
-
+    void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        Mono<ResponseEntity<String>> response = Mono.empty();
+        setTaskNumberOfLoops(3);
+        disableTaskDelay();
 
-        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
-        doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
+        when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
+        doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
+        doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
+                .getFromMessageRouter(anyString());
 
-        messageConsumerUnderTest.start().join();
+        doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
 
-        verify(messageRouterConsumerMock).getForEntity(any());
-        verifyNoMoreInteractions(messageRouterConsumerMock);
+        String s = messageConsumerUnderTest.createTask().blockLast();
+        assertEquals("responseFromHandler", s);
+        verify(messageConsumerUnderTest, times(2)).delay();
+        verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
     }
 
     @Test
-    void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
-        setUpMrConfig();
-
+    void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
-        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
-
-        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
-        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+        setTaskNumberOfLoops(2);
+        disableTaskDelay();
+        setUpMrConfig();
 
-        final ListAppender<ILoggingEvent> logAppender =
-                LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
+        {
+            Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
+            Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
+            doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
+        }
 
-        messageConsumerUnderTest.start().join();
+        doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
 
-        assertThat(logAppender.list.get(0).getFormattedMessage())
-                .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
+        String s = messageConsumerUnderTest.createTask().blockLast();
 
-        verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+        verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
+        verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
+        verify(messageConsumerUnderTest, times(1)).delay();
+        verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
+        assertEquals("response1", s);
     }
 
     @Test
-    void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
-        // The message from MR is here an array of Json objects
-        setUpMrConfig();
+    void unParsableMessage_thenSendResponseAndContinue() throws Exception {
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+        setTaskNumberOfLoops(2);
+        setUpMrConfig();
 
-        String messages = jsonArray(gson.toJson(dmaapRequestMessage(Operation.PUT)));
-
-        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
-
-        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
-        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+        {
+            Mono<String> dmaapError = Mono.just("Non valid JSON \"");
+            Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
+            doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
+        }
 
-        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+        doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
 
-        messageConsumerUnderTest.start().join();
+        String s = messageConsumerUnderTest.createTask().blockLast();
+        assertEquals("response1", s);
 
-        ArgumentCaptor<DmaapRequestMessage> captor = ArgumentCaptor.forClass(DmaapRequestMessage.class);
-        verify(messageHandlerMock).handleDmaapMsg(captor.capture());
-        DmaapRequestMessage messageAfterJsonParsing = captor.getValue();
-        assertThat(messageAfterJsonParsing.apiVersion()).isNotEmpty();
+        verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
+        verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
+        verify(messageConsumerUnderTest, times(0)).delay();
+        verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
+    }
 
-        verifyNoMoreInteractions(messageHandlerMock);
+    private String dmaapRequestMessageString() {
+        String json = gson.toJson(dmaapRequestMessage());
+        return jsonArray(json);
     }
 
     @Test
     void testMessageParsing() throws ServiceException {
         messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock);
-        String json = gson.toJson(dmaapRequestMessage(Operation.PUT));
+        String json = gson.toJson(dmaapRequestMessage());
         {
             String jsonArrayOfObject = jsonArray(json);
-            List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfObject);
+            DmaapRequestMessage parsedMessage =
+                    messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
             assertNotNull(parsedMessage);
-            assertTrue(parsedMessage.get(0).payload().isPresent());
+            assertTrue(parsedMessage.payload().isPresent());
         }
         {
             String jsonArrayOfString = jsonArray(quote(json));
-            List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfString);
+            DmaapRequestMessage parsedMessage =
+                    messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
             assertNotNull(parsedMessage);
-            assertTrue(parsedMessage.get(0).payload().isPresent());
+            assertTrue(parsedMessage.payload().isPresent());
         }
 
     }
 
-    @Test
-    void incomingUnparsableRequest_thenSendResponse() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
-        doReturn(Mono.just("OK")).when(messageHandlerMock).sendDmaapResponse(any(), any(), any());
-        Exception actualException =
-                assertThrows(ServiceException.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
-        assertThat(actualException.getMessage())
-                .contains("Could not parse incomming request. Reason :com.google.gson.stream.MalformedJsonException");
-
-        verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
-    }
-
-    @Test
-    void incomingUnparsableRequest_thenSendingResponseFailed() throws Exception {
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
-        doReturn(Mono.error(new Exception("Sending response failed"))).when(messageHandlerMock).sendDmaapResponse(any(),
-                any(), any());
-        Exception actualException =
-                assertThrows(Exception.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
-        assertThat(actualException.getMessage()).contains("Sending response failed");
-
-        verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
-    }
-
     private void setUpMrConfig() {
         when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
         when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
@@ -237,11 +189,11 @@ class DmaapMessageConsumerTest {
         return "\"" + s.replace("\"", "\\\"") + "\"";
     }
 
-    private DmaapRequestMessage dmaapRequestMessage(Operation operation) {
+    private DmaapRequestMessage dmaapRequestMessage() {
         return ImmutableDmaapRequestMessage.builder() //
                 .apiVersion("apiVersion") //
                 .correlationId("correlationId") //
-                .operation(operation) //
+                .operation(Operation.PUT) //
                 .originatorId("originatorId") //
                 .payload(new JsonObject()) //
                 .requestId("requestId") //
index 9946881..df84ae0 100644 (file)
@@ -110,7 +110,7 @@ class DmaapMessageHandlerTest {
         DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
 
         StepVerifier //
-                .create(testedObject.createTask(message)) //
+                .create(testedObject.handleDmaapMsg(message)) //
                 .expectSubscription() //
                 .expectNext("OK") //
                 .verifyComplete(); //
@@ -130,7 +130,7 @@ class DmaapMessageHandlerTest {
 
         DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
         StepVerifier //
-                .create(testedObject.createTask(message)) //
+                .create(testedObject.handleDmaapMsg(message)) //
                 .expectSubscription() //
                 .expectNext("OK") //
                 .verifyComplete(); //
@@ -152,7 +152,7 @@ class DmaapMessageHandlerTest {
 
         DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
         StepVerifier //
-                .create(testedObject.createTask(message)) //
+                .create(testedObject.handleDmaapMsg(message)) //
                 .expectSubscription() //
                 .verifyComplete(); //
 
@@ -170,7 +170,7 @@ class DmaapMessageHandlerTest {
 
         DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
         StepVerifier //
-                .create(testedObject.createTask(message)) //
+                .create(testedObject.handleDmaapMsg(message)) //
                 .expectSubscription() //
                 .expectNext("OK") //
                 .verifyComplete(); //
@@ -189,7 +189,7 @@ class DmaapMessageHandlerTest {
 
         DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
         StepVerifier //
-                .create(testedObject.createTask(message)) //
+                .create(testedObject.handleDmaapMsg(message)) //
                 .expectSubscription() //
                 .expectNext("OK") //
                 .verifyComplete(); //
@@ -208,7 +208,7 @@ class DmaapMessageHandlerTest {
         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
         DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
-        testedObject.createTask(message).block();
+        testedObject.handleDmaapMsg(message).block();
 
         verify(pmsClient).putForEntity(anyString(), anyString());
         verifyNoMoreInteractions(pmsClient);
@@ -239,7 +239,8 @@ class DmaapMessageHandlerTest {
         final ListAppender<ILoggingEvent> logAppender =
                 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
 
-        testedObject.handleDmaapMsg(message);
+        doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
+        testedObject.handleDmaapMsg(message).block();
 
         assertThat(logAppender.list.get(0).getFormattedMessage())
                 .startsWith("Expected payload in message from DMAAP: ");
index 2e96b68..4cc2360 100644 (file)
@@ -133,17 +133,6 @@ class RefreshConfigTaskTest {
         return obj;
     }
 
-    @Test
-    void stop_thenTaskIsDisposed() throws Exception {
-        refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_DOES_NOT_EXIST, null, null, false);
-        refreshTaskUnderTest.systemEnvironment = new Properties();
-
-        refreshTaskUnderTest.start();
-        refreshTaskUnderTest.stop();
-
-        assertThat(refreshTaskUnderTest.getRefreshTask().isDisposed()).as("Refresh task is disposed").isTrue();
-    }
-
     @Test
     void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception {
         refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_EXISTS);