DMAAP Improvements 28/111728/4
authorPatrikBuhr <patrik.buhr@est.tech>
Tue, 25 Aug 2020 12:55:35 +0000 (14:55 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 27 Aug 2020 07:30:34 +0000 (09:30 +0200)
When a syntactically incorrect DMAAP is receiced, a response is generated
to simplify trouble shooting.

Change-Id: I949df0c13e056fa5713a4af6dfc870e1b7078c0a
Issue-ID: CCSDK-2498
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java

index 37a092b..a4da4bd 100644 (file)
 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
 
 import com.google.common.collect.Iterables;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+import com.google.gson.TypeAdapterFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
 
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
@@ -36,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 
@@ -63,12 +69,17 @@ public class DmaapMessageConsumer {
 
     private DmaapMessageHandler dmaapMessageHandler = null;
 
+    private final Gson gson;
+
     @Value("${server.http-port}")
     private int localServerHttpPort;
 
     @Autowired
     public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
         this.applicationConfig = applicationConfig;
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+        gson = gsonBuilder.create();
     }
 
     /**
@@ -87,10 +98,10 @@ public class DmaapMessageConsumer {
         while (!isStopped()) {
             try {
                 if (isDmaapConfigured()) {
-                    Iterable<String> dmaapMsgs = fetchAllMessages();
+                    Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages();
                     if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
                         logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
-                        for (String msg : dmaapMsgs) {
+                        for (DmaapRequestMessage msg : dmaapMsgs) {
                             processMsg(msg);
                         }
                     }
@@ -115,21 +126,47 @@ public class DmaapMessageConsumer {
             && !consumerTopicUrl.isEmpty());
     }
 
-    private static List<String> parseMessages(String jsonString) {
-        JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
-        List<String> result = new ArrayList<>();
-        for (JsonElement element : arrayOfMessages) {
-            if (element.isJsonPrimitive()) {
-                result.add(element.getAsString());
+    private <T> List<T> parseList(String jsonString, Class<T> clazz) {
+        List<T> result = new ArrayList<>();
+        JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray();
+        for (JsonElement jsonElement : jsonArr) {
+            // The element can either be a JsonObject or a JsonString
+            if (jsonElement.isJsonPrimitive()) {
+                T json = gson.fromJson(jsonElement.getAsString(), clazz);
+                result.add(json);
             } else {
-                String messageAsString = element.toString();
-                result.add(messageAsString);
+                T json = gson.fromJson(jsonElement.toString(), clazz);
+                result.add(json);
             }
         }
         return result;
     }
 
-    protected Iterable<String> fetchAllMessages() throws ServiceException {
+    private void sendErrorResponse(String response) {
+        DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() //
+            .apiVersion("") //
+            .correlationId("") //
+            .operation(DmaapRequestMessage.Operation.PUT) //
+            .originatorId("") //
+            .payload(Optional.empty()) //
+            .requestId("") //
+            .target("") //
+            .timestamp("") //
+            .url("URL") //
+            .build();
+        getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block();
+    }
+
+    List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException {
+        try {
+            return parseList(jsonString, DmaapRequestMessage.class);
+        } catch (Exception e) {
+            sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString);
+            throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage());
+        }
+    }
+
+    protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException {
         String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
         AsyncRestClient consumer = getMessageRouterConsumer();
         ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
@@ -142,7 +179,7 @@ public class DmaapMessageConsumer {
         }
     }
 
-    private void processMsg(String msg) {
+    private void processMsg(DmaapRequestMessage msg) {
         logger.debug("Message Reveived from DMAAP : {}", msg);
         getDmaapMessageHandler().handleDmaapMsg(msg);
     }
index 040d8b3..2d7b506 100644 (file)
@@ -40,12 +40,12 @@ import reactor.core.publisher.Mono;
 /**
  * The class handles incoming requests from DMAAP.
  * <p>
- * That means: invoke a REST call towards this services and to send back a response though DMAAP
+ * That means: invoke a REST call towards this services and to send back a
+ * response though DMAAP
  */
 public class DmaapMessageHandler {
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
-    private static Gson gson = new GsonBuilder() //
-        .create(); //
+    private static Gson gson = new GsonBuilder().create();
     private final AsyncRestClient dmaapClient;
     private final AsyncRestClient pmsClient;
 
@@ -54,7 +54,7 @@ public class DmaapMessageHandler {
         this.dmaapClient = dmaapClient;
     }
 
-    public void handleDmaapMsg(String msg) {
+    public void handleDmaapMsg(DmaapRequestMessage msg) {
         try {
             String result = this.createTask(msg).block();
             logger.debug("handleDmaapMsg: {}", result);
@@ -63,17 +63,10 @@ public class DmaapMessageHandler {
         }
     }
 
-    Mono<String> createTask(String msg) {
-        try {
-            DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
-            return this.invokePolicyManagementService(dmaapRequestMessage) //
-                .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
-                .flatMap(
-                    response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
-        } catch (Exception e) {
-            String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage();
-            return Mono.error(new ServiceException(errorMsg)); // Cannot make any response
-        }
+    Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) {
+        return this.invokePolicyManagementService(dmaapRequestMessage) //
+            .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
+            .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
     }
 
     private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error,
@@ -95,6 +88,12 @@ public class DmaapMessageHandler {
             .flatMap(notUsed -> Mono.empty());
     }
 
+    public Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, HttpStatus status) {
+        return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
+            .flatMap(this::sendToDmaap) //
+            .onErrorResume(this::handleResponseCallError);
+    }
+
     private Mono<ResponseEntity<String>> invokePolicyManagementService(DmaapRequestMessage dmaapRequestMessage) {
         DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
         String uri = dmaapRequestMessage.url();
@@ -122,20 +121,13 @@ public class DmaapMessageHandler {
         }
     }
 
-    private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
-        HttpStatus status) {
-        return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
-            .flatMap(this::sendToDmaap) //
-            .onErrorResume(this::handleResponseCallError);
-    }
-
     private Mono<String> sendToDmaap(String body) {
         logger.debug("sendToDmaap: {} ", body);
         return dmaapClient.post("", "[" + body + "]");
     }
 
     private Mono<String> handleResponseCallError(Throwable t) {
-        logger.debug("Failed to send response to DMaaP: {}", t.getMessage());
+        logger.warn("Failed to send response to DMaaP: {}", t.getMessage());
         return Mono.empty();
     }
 
@@ -152,6 +144,5 @@ public class DmaapMessageHandler {
             .build();
         String str = gson.toJson(dmaapResponseMessage);
         return Mono.just(str);
-
     }
 }
index 57d6969..248ba32 100644 (file)
 
 package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
 
-import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import com.google.gson.TypeAdapterFactory;
 
 import java.io.BufferedInputStream;
 import java.io.File;
@@ -34,7 +32,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.time.Duration;
 import java.util.Properties;
-import java.util.ServiceLoader;
 
 import javax.validation.constraints.NotNull;
 
@@ -239,9 +236,6 @@ public class RefreshConfigTask {
             return Flux.empty();
         }
 
-        GsonBuilder gsonBuilder = new GsonBuilder();
-        ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
-
         try (InputStream inputStream = createInputStream(filepath)) {
             JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
             ApplicationConfigParser appParser = new ApplicationConfigParser();
index 08e9984..1e9695c 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.onap.ccsdk.oran.a1policymanagementservice.clients;
 
-import static org.junit.Assert.fail;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.anyString;
index cc9efa0..2bd8c7f 100644 (file)
@@ -22,6 +22,9 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
 
 import static ch.qos.logback.classic.Level.WARN;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -34,8 +37,13 @@ import static org.mockito.Mockito.when;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+
 import java.time.Duration;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -46,6 +54,8 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
@@ -62,6 +72,8 @@ class DmaapMessageConsumerTest {
 
     private DmaapMessageConsumer messageConsumerUnderTest;
 
+    private Gson gson = new GsonBuilder().create();
+
     @AfterEach
     void resetLogging() {
         LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
@@ -147,17 +159,7 @@ class DmaapMessageConsumerTest {
         setUpMrConfig();
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        String message = "{\"apiVersion\":\"1.0\"," //
-            + "\"operation\":\"GET\"," //
-            + "\"correlationId\":\"1592341013115594000\"," //
-            + "\"originatorId\":\"849e6c6b420\"," //
-            + "\"payload\":{}," //
-            + "\"requestId\":\"23343221\", " //
-            + "\"target\":\"policy-management-service\"," //
-            + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," //
-            + "\"type\":\"request\"," //
-            + "\"url\":\"/rics\"}";
-        String messages = "[" + message + "]";
+        String messages = jsonArray(gson.toJson(dmaapRequestMessage(Operation.PUT)));
 
         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
@@ -169,37 +171,84 @@ class DmaapMessageConsumerTest {
 
         messageConsumerUnderTest.start().join();
 
-        ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<DmaapRequestMessage> captor = ArgumentCaptor.forClass(DmaapRequestMessage.class);
         verify(messageHandlerMock).handleDmaapMsg(captor.capture());
-        String messageAfterJsonParsing = captor.getValue();
-        assertThat(messageAfterJsonParsing).contains("apiVersion");
+        DmaapRequestMessage messageAfterJsonParsing = captor.getValue();
+        assertThat(messageAfterJsonParsing.apiVersion()).isNotEmpty();
 
         verifyNoMoreInteractions(messageHandlerMock);
     }
 
     @Test
-    void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
-        // The message from MR is here an array of String (which is the case when the MR
-        // simulator is used)
-        setUpMrConfig();
-        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-
-        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+    void testMessageParsing() throws ServiceException {
+        messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock);
+        String json = gson.toJson(dmaapRequestMessage(Operation.PUT));
+        {
+            String jsonArrayOfObject = jsonArray(json);
+            List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfObject);
+            assertNotNull(parsedMessage);
+            assertTrue(parsedMessage.get(0).payload().isPresent());
+        }
+        {
+            String jsonArrayOfString = jsonArray(quote(json));
+            List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfString);
+            assertNotNull(parsedMessage);
+            assertTrue(parsedMessage.get(0).payload().isPresent());
+        }
 
-        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
-        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+    }
 
+    @Test
+    void incomingUnparsableRequest_thenSendResponse() throws Exception {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
         doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+        doReturn(Mono.just("OK")).when(messageHandlerMock).sendDmaapResponse(any(), any(), any());
+        Exception actualException =
+            assertThrows(ServiceException.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
+        assertThat(actualException.getMessage())
+            .contains("Could not parse incomming request. Reason :com.google.gson.stream.MalformedJsonException");
 
-        messageConsumerUnderTest.start().join();
+        verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
+    }
 
-        verify(messageHandlerMock).handleDmaapMsg("aMessage");
-        verifyNoMoreInteractions(messageHandlerMock);
+    @Test
+    void incomingUnparsableRequest_thenSendingResponseFailed() throws Exception {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+        doReturn(Mono.error(new Exception("Sending response failed"))).when(messageHandlerMock).sendDmaapResponse(any(),
+            any(), any());
+        Exception actualException =
+            assertThrows(Exception.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
+        assertThat(actualException.getMessage()).contains("Sending response failed");
+
+        verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
     }
 
     private void setUpMrConfig() {
         when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
         when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
     }
+
+    private String jsonArray(String s) {
+        return "[" + s + "]";
+    }
+
+    private String quote(String s) {
+        return "\"" + s.replace("\"", "\\\"") + "\"";
+    }
+
+    private DmaapRequestMessage dmaapRequestMessage(Operation operation) {
+        return ImmutableDmaapRequestMessage.builder() //
+            .apiVersion("apiVersion") //
+            .correlationId("correlationId") //
+            .operation(operation) //
+            .originatorId("originatorId") //
+            .payload(new JsonObject()) //
+            .requestId("requestId") //
+            .target("target") //
+            .timestamp("timestamp") //
+            .url("URL") //
+            .build();
+    }
+
 }
index 2405e46..3656ec1 100644 (file)
@@ -22,9 +22,6 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
 
 import static ch.qos.logback.classic.Level.WARN;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -66,19 +63,18 @@ class DmaapMessageHandlerTest {
     private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
     private final AsyncRestClient pmsClient = mock(AsyncRestClient.class);
     private DmaapMessageHandler testedObject;
-    private static Gson gson = new GsonBuilder() //
-        .create(); //
+    private Gson gson = new GsonBuilder().create(); //
 
     @BeforeEach
     private void setUp() throws Exception {
         testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient));
     }
 
-    static JsonObject payloadAsJson() {
+    JsonObject payloadAsJson() {
         return gson.fromJson(payloadAsString(), JsonObject.class);
     }
 
-    static String payloadAsString() {
+    String payloadAsString() {
         return "{\"param\":\"value\"}";
     }
 
@@ -99,10 +95,6 @@ class DmaapMessageHandlerTest {
             .build();
     }
 
-    private String dmaapInputMessage(Operation operation) {
-        return gson.toJson(dmaapRequestMessage(operation));
-    }
-
     private Mono<ResponseEntity<String>> okResponse() {
         ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
         return Mono.just(entity);
@@ -113,40 +105,12 @@ class DmaapMessageHandlerTest {
         return Mono.just(entity);
     }
 
-    @Test
-    void testMessageParsing() {
-        String message = dmaapInputMessage(Operation.DELETE);
-        logger.info(message);
-        DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
-        assertNotNull(parsedMessage);
-        assertFalse(parsedMessage.payload().isPresent());
-
-        message = dmaapInputMessage(Operation.PUT);
-        logger.info(message);
-        parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
-        assertNotNull(parsedMessage);
-        assertTrue(parsedMessage.payload().isPresent());
-    }
-
-    @Test
-    void unparseableMessage_thenWarning() {
-        final ListAppender<ILoggingEvent> logAppender =
-            LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
-
-        String msg = "bad message";
-        testedObject.handleDmaapMsg(msg);
-
-        assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
-            "handleDmaapMsg failure org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException: Received unparsable "
-                + "message from DMAAP: \"" + msg + "\", reason: ");
-    }
-
     @Test
     void successfulDelete() throws IOException {
         doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString());
         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
-        String message = dmaapInputMessage(Operation.DELETE);
+        DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
 
         StepVerifier //
             .create(testedObject.createTask(message)) //
@@ -167,8 +131,9 @@ class DmaapMessageHandlerTest {
         doReturn(okResponse()).when(pmsClient).getForEntity(anyString());
         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
+        DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
         StepVerifier //
-            .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+            .create(testedObject.createTask(message)) //
             .expectSubscription() //
             .expectNext("OK") //
             .verifyComplete(); //
@@ -188,8 +153,9 @@ class DmaapMessageHandlerTest {
         doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString());
         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
+        DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
         StepVerifier //
-            .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+            .create(testedObject.createTask(message)) //
             .expectSubscription() //
             .verifyComplete(); //
 
@@ -205,8 +171,9 @@ class DmaapMessageHandlerTest {
         doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString());
         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
+        DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
         StepVerifier //
-            .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
+            .create(testedObject.createTask(message)) //
             .expectSubscription() //
             .expectNext("OK") //
             .verifyComplete(); //
@@ -223,8 +190,9 @@ class DmaapMessageHandlerTest {
         doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString());
         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
+        DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
         StepVerifier //
-            .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
+            .create(testedObject.createTask(message)) //
             .expectSubscription() //
             .expectNext("OK") //
             .verifyComplete(); //
@@ -237,12 +205,13 @@ class DmaapMessageHandlerTest {
     }
 
     @Test
-    void exceptionWhenCallingPms_thenNotFoundResponse() throws IOException {
+    void exceptionWhenCallingPms_thenErrorResponse() throws IOException {
 
         doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
         doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
-        testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
+        DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
+        testedObject.createTask(message).block();
 
         verify(pmsClient).putForEntity(anyString(), anyString());
         verifyNoMoreInteractions(pmsClient);
@@ -256,25 +225,19 @@ class DmaapMessageHandlerTest {
         verifyNoMoreInteractions(dmaapClient);
     }
 
-    @Test
-    void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
-        String message = dmaapInputMessage(Operation.PUT).toString();
-        String badOperation = "BAD";
-        message = message.replace(Operation.PUT.toString(), badOperation);
-
-        testedObject.handleDmaapMsg(message);
-
-        ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
-        verify(dmaapClient).post(anyString(), captor.capture());
-        String actualMessage = captor.getValue();
-        assertThat(actualMessage).contains("Not implemented operation") //
-            .contains("BAD_REQUEST");
-    }
-
     @Test
     void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
-        String message = dmaapInputMessage(Operation.PUT).toString();
-        message = message.replace("payload", "junk");
+        DmaapRequestMessage message = ImmutableDmaapRequestMessage.builder() //
+            .apiVersion("apiVersion") //
+            .correlationId("correlationId") //
+            .operation(DmaapRequestMessage.Operation.PUT) //
+            .originatorId("originatorId") //
+            .payload(Optional.empty()) //
+            .requestId("requestId") //
+            .target("target") //
+            .timestamp("timestamp") //
+            .url(URL) //
+            .build();
 
         final ListAppender<ILoggingEvent> logAppender =
             LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
@@ -284,4 +247,5 @@ class DmaapMessageHandlerTest {
         assertThat(logAppender.list.get(0).getFormattedMessage())
             .startsWith("Expected payload in message from DMAAP: ");
     }
+
 }