Publish VES event to MessageRouter via http 80/82980/2
authoremartin <ephraim.martin@est.tech>
Fri, 22 Mar 2019 16:45:31 +0000 (16:45 +0000)
committeremartin <ephraim.martin@est.tech>
Fri, 22 Mar 2019 16:45:31 +0000 (16:45 +0000)
Change-Id: Ic5ed1fad1182e7343f213488c4015d2683ab8ddc
Issue-ID: DCAEGEN2-1273
Signed-off-by: emartin <ephraim.martin@est.tech>
pom.xml
src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java [new file with mode: 0644]
src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java [new file with mode: 0644]
src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index ca6c426..31572a3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
         <junit4.version>4.12</junit4.version>
         <jsonschema.version>1.3.0</jsonschema.version>
         <xerces.version>2.11.0</xerces.version>
+        <reactor.test>3.1.0.RELEASE</reactor.test>
         <!-- Plugin Versions -->
         <shade.plugin.version>3.2.0</shade.plugin.version>
         <jacoco.version>0.8.2</jacoco.version>
             <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <version>${reactor.test}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.everit.json</groupId>
             <artifactId>org.everit.json.schema</artifactId>
index 9abe086..03d42d5 100644 (file)
@@ -38,6 +38,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
 import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
 import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
 import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
 import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
@@ -73,6 +74,7 @@ public class App {
         Mapper mapper = new Mapper(mappingTemplate);
         MeasSplitter splitter = new MeasSplitter(measConverter);
         XMLValidator validator = new XMLValidator(xmlSchema);
+        VESPublisher vesPublisher = new VESPublisher(mapperConfig);
 
         flux.onBackpressureDrop(App::handleBackPressure)
                 .doOnNext(App::receiveRequest)
@@ -86,6 +88,7 @@ public class App {
                 .concatMap(event -> App.split(splitter,event, mapperConfig))
                 .filter(events -> App.filter(filterHandler, events, mapperConfig))
                 .concatMap(events -> App.map(mapper, events, mapperConfig))
+                .concatMap(vesPublisher::publish)
                 .subscribe(events -> logger.unwrap().info("Event Processed"));
 
         DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
index f37bcd3..19a4750 100644 (file)
@@ -135,7 +135,7 @@ public class DataRouterSubscriber implements HttpHandler, Configurable {
 
     private JsonObject getBusControllerSubscribeBody(MapperConfig config) {
         JsonObject subscriberObj = new JsonObject();
-        subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation());
+        subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation());
         subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl());
         subscriberObj.addProperty("feedId", config.getDmaapDRFeedId());
         subscriberObj.addProperty("lastMod", Instant.now().toString());
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
new file mode 100644 (file)
index 0000000..6b5c157
--- /dev/null
@@ -0,0 +1,28 @@
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ *  Copyright (C) 2019 Nordix Foundation.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ *\r
+ * SPDX-License-Identifier: Apache-2.0\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+\r
+package org.onap.dcaegen2.services.pmmapper.exceptions;\r
+\r
+public class MRPublisherException extends RuntimeException{\r
+    public MRPublisherException(String message, Throwable cause) {\r
+        super(message, cause);\r
+    }\r
+\r
+}\r
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
new file mode 100644 (file)
index 0000000..77b0545
--- /dev/null
@@ -0,0 +1,69 @@
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ *  Copyright (C) 2019 Nordix Foundation.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ *\r
+ * SPDX-License-Identifier: Apache-2.0\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+\r
+package org.onap.dcaegen2.services.pmmapper.messagerouter;\r
+\r
+import java.util.List;\r
+\r
+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;\r
+import org.onap.dcaegen2.services.pmmapper.model.Event;\r
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;\r
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;\r
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import reactor.core.publisher.Flux;\r
+\r
+public class VESPublisher {\r
+    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));\r
+    private RequestSender sender;\r
+    private MapperConfig config;\r
+\r
+    public VESPublisher(MapperConfig config) {\r
+        this(config, new RequestSender());\r
+    }\r
+\r
+    public VESPublisher(MapperConfig config, RequestSender sender) {\r
+        this.sender = sender;\r
+        this.config = config;\r
+    }\r
+\r
+    public Flux<Event> publish(List<Event> events) {\r
+        logger.unwrap().info("Publishing VES events to messagerouter.");\r
+        Event event = events.get(0);\r
+        try {\r
+            events.forEach(e -> this.publish(e.getVes()));\r
+            logger.unwrap().info("Successfully published VES events to messagerouter.");\r
+        } catch(MRPublisherException e) {\r
+            logger.unwrap().error("Failed to publish VES event(s) to messagerouter. {}", e.getMessage());\r
+            return Flux.empty();\r
+        }\r
+        return Flux.just(event);\r
+    }\r
+\r
+    private void publish(String ves) {\r
+        try {\r
+            String topicUrl = config.getPublisherTopicUrl();\r
+            sender.send("POST", topicUrl, ves);\r
+        } catch (Exception e) {\r
+            throw new MRPublisherException(e.getMessage(), e);\r
+        }\r
+    }\r
+}\r
index d28d850..ffb09ba 100644 (file)
@@ -82,6 +82,14 @@ public class MapperConfig {
         return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();\r
     }\r
 \r
+    public String getSubscriberDcaeLocation() {\r
+        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();\r
+    }\r
+\r
+    public String getPublisherTopicUrl() {\r
+        return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl();\r
+    }\r
+\r
     public boolean dmaapInfoEquals(MapperConfig mapperConfig){\r
         return this\r
                 .getStreamsSubscribes()\r
index 3380aca..658f820 100644 (file)
 package org.onap.dcaegen2.services.pmmapper.utils;\r
 \r
 import java.io.BufferedReader;\r
+import java.io.IOException;\r
 import java.io.InputStream;\r
 import java.io.InputStreamReader;\r
+import java.io.OutputStream;\r
 import java.net.HttpURLConnection;\r
 import java.net.URL;\r
+import java.nio.charset.StandardCharsets;\r
 import java.util.UUID;\r
 import java.util.stream.Collectors;\r
 \r
@@ -39,41 +42,47 @@ public class RequestSender {
     private static final int ERROR_START_RANGE = 300;\r
     private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));\r
     public static final String DELETE = "DELETE";\r
+    public static final String DEFAULT_CONTENT_TYPE = "text/plain";\r
+    public static final int DEFAULT_READ_TIMEOUT = 5000;\r
 \r
     /**\r
-     * Sends an Http GET request to a given endpoint.\r
-     *\r
-     * @return http response body\r
-     * @throws Exception\r
-     * @throws InterruptedException\r
+     * Works just like {@link RequestSender#send(method,urlString)}, except {@code method }\r
+     * is set to {@code GET} by default.\r
+     * @see RequestSender#send(String,String,String)\r
      */\r
-\r
     public String send(final String urlString) throws Exception {\r
         return send("GET", urlString);\r
     }\r
 \r
+    /**\r
+     * Works just like {@link RequestSender#send(method,urlString,body)}, except {@code body }\r
+     * is set to empty String by default.\r
+     * @see RequestSender#send(String,String,String)\r
+     */\r
+    public String send(String method, final String urlString) throws Exception {\r
+       return send(method,urlString,"");\r
+    }\r
 \r
     /**\r
-     * Sends a request to a given endpoint.\r
+     * Sends an http request to a given endpoint.\r
      * @param method of the outbound request\r
      * @param urlString representing given endpoint\r
+     * @param body of the request as json\r
      * @return http response body\r
      * @throws Exception\r
      */\r
-    public String send(String method, final String urlString) throws Exception {\r
+    public String send(String method, final String urlString, final String body) throws Exception {\r
         final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);\r
         final UUID requestID = UUID.randomUUID();\r
         String result = "";\r
 \r
         for (int i = 1; i <= MAX_RETRIES; i++) {\r
-            URL url = new URL(urlString);\r
-            HttpURLConnection connection = (HttpURLConnection) url.openConnection();\r
-            connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());\r
-            connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());\r
-            connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
-            connection.setRequestMethod(method);\r
-            logger.unwrap()\r
-                    .info("Sending:\n{}", connection.getRequestProperties());\r
+            final URL url = new URL(urlString);\r
+            final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);\r
+            if(!body.isEmpty()) {\r
+                setMessageBody(connection, body);\r
+            }\r
+            logger.unwrap().info("Sending {} request to {}.", method, urlString);\r
 \r
             try (InputStream is = connection.getInputStream();\r
                     BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {\r
@@ -81,15 +90,12 @@ public class RequestSender {
                         .collect(Collectors.joining("\n"));\r
                 int responseCode = connection.getResponseCode();\r
                 if (!(isWithinErrorRange(responseCode))) {\r
-                    logger.unwrap()\r
-                            .info("Received:\n{}", result);\r
+                    logger.unwrap().info("Server Response Received:\n{}", result);\r
                     break;\r
                 }\r
-\r
             } catch (Exception e) {\r
                 if (retryLimitReached(i)) {\r
-                    logger.unwrap()\r
-                            .error("Execution error: "+connection.getResponseMessage(), e);\r
+                    logger.unwrap().error("Execution error: "+connection.getResponseMessage(), e);\r
                     throw new Exception(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);\r
                 }\r
             }\r
@@ -99,6 +105,26 @@ public class RequestSender {
         return result;\r
     }\r
 \r
+    private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, UUID requestID) throws Exception {\r
+        HttpURLConnection connection = (HttpURLConnection) url.openConnection();\r
+        connection.setReadTimeout(DEFAULT_READ_TIMEOUT);\r
+        connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());\r
+        connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());\r
+        connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
+        connection.setRequestMethod(method);\r
+\r
+        return connection;\r
+    }\r
+\r
+    private void setMessageBody(HttpURLConnection connection, String body) throws IOException {\r
+        connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);\r
+        connection.setDoOutput(true);\r
+        OutputStream outputStream = connection.getOutputStream();\r
+        outputStream.write(body.getBytes(StandardCharsets.UTF_8));\r
+        outputStream.flush();\r
+        outputStream.close();\r
+    }\r
+\r
     private boolean retryLimitReached(final int retryCount) {\r
         return retryCount >= MAX_RETRIES;\r
     }\r
diff --git a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
new file mode 100644 (file)
index 0000000..69d34f8
--- /dev/null
@@ -0,0 +1,89 @@
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ *  Copyright (C) 2019 Nordix Foundation.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ *\r
+ * SPDX-License-Identifier: Apache-2.0\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+package org.onap.dcaegen2.pmmapper.messagerouter;\r
+import static org.junit.jupiter.api.Assertions.assertThrows;\r
+import static org.mockito.Mockito.mock;\r
+import static org.mockito.Mockito.times;\r
+import static org.mockito.Mockito.verify;\r
+import static org.mockito.Mockito.when;\r
+import reactor.test.StepVerifier;\r
+import java.util.Arrays;\r
+import java.util.List;\r
+\r
+import org.junit.Before;\r
+import org.junit.Test;\r
+import org.junit.runner.RunWith;\r
+import org.mockito.Mockito;\r
+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;\r
+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;\r
+import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;\r
+import org.onap.dcaegen2.services.pmmapper.model.Event;\r
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;\r
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;\r
+import org.powermock.core.classloader.annotations.PrepareForTest;\r
+import org.powermock.modules.junit4.PowerMockRunner;\r
+import reactor.core.publisher.Flux;\r
+\r
+@RunWith(PowerMockRunner.class)\r
+@PrepareForTest(EnvironmentConfig.class)\r
+public class VESPublisherTest {\r
+\r
+    private static String topicURL = "http://mr/topic";\r
+    private static RequestSender sender;\r
+    private static MapperConfig config;\r
+    private VESPublisher sut;\r
+    private String ves = "{}";\r
+\r
+    @Before\r
+    public void before() throws Exception {\r
+        config = mock(MapperConfig.class);\r
+        sender = mock(RequestSender.class);\r
+        sut = new VESPublisher(config, sender);\r
+        when(config.getPublisherTopicUrl()).thenReturn(topicURL);\r
+    }\r
+\r
+    @Test\r
+    public void publish_multiple_success() throws Exception {\r
+        Event event = mock(Event.class);\r
+        List<Event> events  = Arrays.asList(event,event,event);\r
+        when(event.getVes()).thenReturn(ves);\r
+\r
+        Flux<Event> flux = sut.publish(events);\r
+\r
+        verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString());\r
+        StepVerifier.create(flux)\r
+            .expectNextMatches(event::equals)\r
+            .expectComplete()\r
+            .verify();\r
+    }\r
+\r
+    @Test\r
+    public void publish_multiple_fail() throws Exception {\r
+        Event event = mock(Event.class);\r
+        List<Event> events  = Arrays.asList(event,event,event);\r
+        when(event.getVes()).thenReturn(ves);\r
+        when(sender.send("POST",topicURL,ves)).thenThrow(Exception.class);\r
+\r
+        Flux<Event> flux = sut.publish(events);\r
+\r
+        StepVerifier.create(flux)\r
+            .verifyComplete();\r
+    }\r
+}\r