Update DataRouter Subscriber 54/79454/1
authorJoeOLeary <joseph.o.leary@est.tech>
Fri, 1 Mar 2019 14:09:52 +0000 (14:09 +0000)
committerJoeOLeary <joseph.o.leary@est.tech>
Fri, 1 Mar 2019 14:09:52 +0000 (14:09 +0000)
*Update metadata header key to match new datarouter specification
*Update subscriber to be a privileged subscriber
*Update subscriber to improve logging & remove sonar smells
*Update delivery end point to match datarouter specification
*Update event to include the publish id provided by datarouter
*Add datarouter event processed utility

Issue-ID: DCAEGEN2-1038
Change-Id: Iafce544f31f888de53547de8b280faebd8075d4c
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
pom.xml
src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java [new file with mode: 0644]
src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java [new file with mode: 0644]
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java [new file with mode: 0644]
src/test/java/utils/EventUtils.java

diff --git a/pom.xml b/pom.xml
index 2b59034..7dff143 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                         <goals>
                             <goal>prepare-agent</goal>
                         </goals>
+                        <configuration>
+                            <excludes>
+                                <exclude>*</exclude>
+                            </excludes>
+                        </configuration>
                     </execution>
                     <execution>
                         <id>report</id>
                         </goals>
                     </execution>
                 </executions>
-                <configuration>
-                    <excludes>
-                        <exclude>**/*App.*</exclude>
-                    </excludes>
-                </configuration>
             </plugin>
         </plugins>
     </build>
index e083466..11767e6 100644 (file)
@@ -74,7 +74,7 @@ public class App {
 
         Undertow.builder()
                 .addHttpListener(8081, "0.0.0.0")
-                .setHandler(Handlers.routing().add("put", "/delivery", dataRouterSubscriber)
+                .setHandler(Handlers.routing().add("put", "/delivery/{filename}", dataRouterSubscriber)
                         .add("get", "/healthcheck", healthCheckHandler))
                 .build().start();
     }
index 4dcad3e..2f2ab4d 100644 (file)
@@ -61,12 +61,14 @@ import java.util.UUID;
  */
 @Data
 public class DataRouterSubscriber implements HttpHandler {
+    public static final String METADATA_HEADER = "X-DMAAP-DR-META";
+    public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
+
     private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class));
     private static final int NUMBER_OF_ATTEMPTS = 5;
     private static final int DEFAULT_TIMEOUT = 2000;
     private static final int MAX_JITTER = 50;
 
-    private static final String METADATA_HEADER = "X-ATT-DR-META";
     private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
     private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
 
@@ -95,10 +97,11 @@ public class DataRouterSubscriber implements HttpHandler {
      */
     public void start(MapperConfig config) throws TooManyTriesException, InterruptedException {
         try {
-            logger.unwrap().info(ONAPLogConstants.Markers.ENTRY, "Starting subscription to DataRouter");
+            logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
             subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+            logger.unwrap().info("Successfully started DR Subscriber");
         } finally {
-            logger.unwrap().info(ONAPLogConstants.Markers.EXIT, "");
+            logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
         }
     }
 
@@ -128,6 +131,7 @@ public class DataRouterSubscriber implements HttpHandler {
         subscriberObj.addProperty("lastMod", Instant.now().toString());
         subscriberObj.addProperty("username", config.getBusControllerUserName());
         subscriberObj.addProperty("userpwd", config.getBusControllerPassword());
+        subscriberObj.addProperty("privilegedSubscriber", true);
         return subscriberObj;
     }
 
@@ -183,9 +187,11 @@ public class DataRouterSubscriber implements HttpHandler {
 
                     Map<String,String> mdc = MDC.getCopyOfContextMap();
                     EventMetadata metadata = getMetadata(httpServerExchange);
+                    String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
                     httpServerExchange.getRequestReceiver()
                             .receiveFullString((callbackExchange, body) ->
-                                httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc)))
+                                httpServerExchange.dispatch(() ->
+                                        eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity)))
                             );
                 } catch (NoMetadataException exception) {
                     logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/ProcessEventException.java
new file mode 100644 (file)
index 0000000..e8a2f11
--- /dev/null
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class ProcessEventException extends RuntimeException{
+    public ProcessEventException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
index c2cacaa..7a7cb1f 100644 (file)
@@ -38,4 +38,6 @@ public class Event {
     private EventMetadata metadata;
     @NonNull
     private Map<String, String> mdc;
+    @NonNull
+    private String publishIdentity;
 }
index 40327db..0412ece 100644 (file)
@@ -78,6 +78,9 @@ public class MapperConfig {
         return new URL(this.getBusControllerSubscriptionEndpoint());\r
     }\r
 \r
+    public String getSubscriberIdentity(){\r
+        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();\r
+    }\r
     @Getter\r
     @EqualsAndHashCode\r
     private class StreamsSubscribes {\r
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
new file mode 100644 (file)
index 0000000..9525ec7
--- /dev/null
@@ -0,0 +1,53 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+public class DataRouterUtils {
+    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterUtils.class));
+
+    private DataRouterUtils(){
+        throw new IllegalStateException("Utility class;shouldn't be constructed");
+    }
+
+    /**
+     * Sends Delete to DR required as part of the new guaranteed delivery mechanism.
+     * @param config used to determine subscriber id and target endpoint
+     * @param event event to be processed
+     */
+    public static String processEvent(MapperConfig config, Event event){
+        logger.unwrap().info("Sending processed to DataRouter");
+        String baseDelete = config.getDmaapDRDeleteEndpoint();
+        String subscriberIdentity = config.getSubscriberIdentity();
+        String delete = String.format("https://%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity());
+        try {
+            return new RequestSender().send("DELETE", delete);
+        } catch (Exception exception) {
+            logger.unwrap().error("Process event failure", exception);
+            throw new ProcessEventException("Process event failure", exception);
+        }
+    }
+}
index 25519a0..3380aca 100644 (file)
@@ -32,14 +32,13 @@ import org.onap.logging.ref.slf4j.ONAPLogAdapter;
 import org.onap.logging.ref.slf4j.ONAPLogConstants;\r
 import org.slf4j.LoggerFactory;\r
 \r
-import lombok.extern.slf4j.Slf4j;\r
-\r
 public class RequestSender {\r
     private static final int MAX_RETRIES = 5;\r
     private static final int RETRY_INTERVAL = 1000;\r
     private static final String SERVER_ERROR_MESSAGE = "Error on Server";\r
     private static final int ERROR_START_RANGE = 300;\r
     private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));\r
+    public static final String DELETE = "DELETE";\r
 \r
     /**\r
      * Sends an Http GET request to a given endpoint.\r
@@ -50,6 +49,18 @@ public class RequestSender {
      */\r
 \r
     public String send(final String urlString) throws Exception {\r
+        return send("GET", urlString);\r
+    }\r
+\r
+\r
+    /**\r
+     * Sends a request to a given endpoint.\r
+     * @param method of the outbound request\r
+     * @param urlString representing given endpoint\r
+     * @return http response body\r
+     * @throws Exception\r
+     */\r
+    public String send(String method, final String urlString) throws Exception {\r
         final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);\r
         final UUID requestID = UUID.randomUUID();\r
         String result = "";\r
@@ -60,6 +71,7 @@ public class RequestSender {
             connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());\r
             connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());\r
             connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
+            connection.setRequestMethod(method);\r
             logger.unwrap()\r
                     .info("Sending:\n{}", connection.getRequestProperties());\r
 \r
index ad73b63..fdc1bf6 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.onap.dcaegen2.services.pmmapper.datarouter;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
@@ -29,19 +31,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
+
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
 import io.undertow.io.Receiver;
 import io.undertow.io.Sender;
 import io.undertow.server.HttpServerExchange;
 import io.undertow.util.HeaderMap;
 import io.undertow.util.StatusCodes;
-import utils.LoggingUtils;
 
 import java.io.IOException;
 import java.net.HttpURLConnection;
@@ -56,12 +56,13 @@ import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
-import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
 import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import utils.LoggingUtils;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(DataRouterSubscriber.class)
@@ -218,8 +219,9 @@ public class DataRouterSubscriberTest {
         String testString = "MESSAGE BODY";
         JsonObject metadata = new JsonParser().parse(
                 new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject();
-        when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt()))
+        when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.METADATA_HEADER).get(anyInt()))
                 .thenReturn(metadata.toString());
+        when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.PUB_ID_HEADER).getFirst()).thenReturn("");
         doAnswer((Answer<Void>) invocationOnMock -> {
             Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
             callback.handle(httpServerExchange, testString);
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
new file mode 100644 (file)
index 0000000..73967c2
--- /dev/null
@@ -0,0 +1,116 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+import utils.EventUtils;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RequestSender.class)
+public class DataRouterUtilsTest {
+
+    @Test
+    public void processEventSuccessful() throws Exception {
+        String serviceResponse = "I'm a service response ;)";
+        String publishIdentity = "12";
+        PowerMockito.mockStatic(Thread.class);
+        MapperConfig mockMapperConfig = mock(MapperConfig.class);
+        URL mockURL = mock(URL.class);
+        HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+        when(mockConnection.getResponseCode()).thenReturn(200);
+        when(mockConnection.getInputStream()).thenReturn(new ByteArrayInputStream(serviceResponse.getBytes()));
+
+        when(mockURL.openConnection()).thenReturn(mockConnection);
+        when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/");
+        when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12");
+
+        PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
+
+        Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
+        assertEquals(serviceResponse,  DataRouterUtils.processEvent(mockMapperConfig, testEvent));
+        verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE);
+    }
+
+    @Test
+    public void testNegativeResponse() throws Exception {
+        String serviceResponse = "I'm a negative service response ;)";
+        String publishIdentity = "12";
+        PowerMockito.mockStatic(Thread.class);
+        MapperConfig mockMapperConfig = mock(MapperConfig.class);
+        URL mockURL = mock(URL.class);
+        HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+        when(mockConnection.getResponseCode()).thenReturn(503);
+        when(mockConnection.getInputStream())
+                .thenAnswer(invocationOnMock -> new ByteArrayInputStream(serviceResponse.getBytes()));
+
+        when(mockURL.openConnection()).thenReturn(mockConnection);
+        when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/");
+        when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12");
+
+        PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
+        Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
+        assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
+        verify(mockConnection, times(5)).setRequestMethod(RequestSender.DELETE);
+    }
+
+    @Test
+    public void testConstructionException() {
+        assertThrows(IllegalStateException.class, () -> Whitebox.invokeConstructor(DataRouterUtils.class));
+    }
+
+    @Test
+    public void testProcessEventFailure() throws Exception {
+        PowerMockito.mockStatic(Thread.class);
+        MapperConfig mockMapperConfig = mock(MapperConfig.class);
+        URL mockURL = mock(URL.class);
+        HttpURLConnection mockConnection = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+        when(mockConnection.getResponseCode()).thenReturn(503);
+
+        when(mockURL.openConnection()).thenReturn(mockConnection);
+        when(mockMapperConfig.getDmaapDRDeleteEndpoint()).thenReturn("dmaap-dr-node/delete/");
+        when(mockMapperConfig.getSubscriberIdentity()).thenReturn("12");
+
+        PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
+        Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class));
+        assertThrows(ProcessEventException.class, () -> DataRouterUtils.processEvent(mockMapperConfig, testEvent));
+    }
+}
index 90317c2..a6b131c 100644 (file)
@@ -75,11 +75,26 @@ public class EventUtils {
     }
 
     /**
+     * Makes an event with a mock http server exchange, empty mdc and publish identity
      * @param body body for the event.
      * @param eventMetadata metadata for the event.
      * @return event with mock HttpServerExchange
      */
     public static Event makeMockEvent(String body, EventMetadata eventMetadata) {
-        return new Event(mock(HttpServerExchange.class, RETURNS_DEEP_STUBS), body, eventMetadata, new HashMap<>());
+        return new Event(mock(HttpServerExchange.class, RETURNS_DEEP_STUBS), body, eventMetadata, new HashMap<>(), "");
     }
+
+
+    /**
+     * Makes an event with a mock http server exchange and empty mdc
+     * @param body body for the event.
+     * @param eventMetadata metadata for the event.
+     * @return event with mock HttpServerExchange
+     */
+    public static Event makeMockEvent(String body, EventMetadata eventMetadata, String publishIdentity) {
+        HttpServerExchange mockHttpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        return new Event(mockHttpServerExchange, body, eventMetadata, new HashMap<>(), publishIdentity);
+    }
+
+
 }