[DCAE/PM-Mapper] Utilize DMaaP-Client in PM-Mapper 76/120476/4
authortkogut <tomasz.kogut@nokia.com>
Tue, 13 Apr 2021 08:11:35 +0000 (10:11 +0200)
committertkogut <tomasz.kogut@nokia.com>
Wed, 14 Apr 2021 15:23:50 +0000 (17:23 +0200)
 - Bump mockserver libraries
 - Use dmaap-client for sending events to dmapp-mr
 - Extract Retry/Timeout configs to separate class
 - Extract logging utils to separate class

Issue-ID: DCAEGEN2-2732
Change-Id: I5d406e99fe1def078f102ff704df5312f5ae996b
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
14 files changed:
Changelog.md
pom.xml
src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java [new file with mode: 0644]
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java [new file with mode: 0644]
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java [new file with mode: 0644]
src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
src/test/java/org/onap/dcaegen2/services/pmmapper/mapping/MapperTest.java
src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java [new file with mode: 0644]
src/test/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSenderTests.java

index 3b10d37..7990b6b 100644 (file)
@@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
 
 ## [1.6.0] - 25/03/2021
 ### Changed
+- Utilize DMaaP-Client in PM-Mapper
 - Switched Dockerfile to integration image (alpine-based)
 
 ## [1.5.2] - 18/03/2021
diff --git a/pom.xml b/pom.xml
index e978515..3b82491 100644 (file)
--- a/pom.xml
+++ b/pom.xml
         <logback.version>1.2.3</logback.version>
         <reactor.version>3.4.0</reactor.version>
         <undertow.version>2.2.3.Final</undertow.version>
-        <gson.version>2.8.5</gson.version>
+        <gson.version>2.8.6</gson.version>
         <freemarker.version>2.3.28</freemarker.version>
         <commons.io.version>2.8.0</commons.io.version>
         <xml.version>2.3.1</xml.version>
         <jaxb.version>2.3.0.1</jaxb.version>
+        <dmaap-clinet.version>1.8.2</dmaap-clinet.version>
         <!-- Testing Test Dependencies -->
         <junit.version>5.3.2</junit.version>
         <mockito.version>2.23.4</mockito.version>
         <mockito-ju5-ext.version>2.23.4</mockito-ju5-ext.version>
         <powermock.version>2.0.7</powermock.version>
-        <mockserver.version>3.10.8</mockserver.version>
+        <mockserver.version>5.11.2</mockserver.version>
         <junit4.version>4.12</junit4.version>
-        <jsonschema.version>1.3.0</jsonschema.version>
+        <jsonschema.version>1.5.1</jsonschema.version>
+        <json.version>20210307</json.version>
+        <jackson.version>2.12.2</jackson.version>
         <xerces.version>2.11.0</xerces.version>
         <reactor.test>3.4.0</reactor.test>
         <!-- Plugin Versions -->
             <artifactId>jaxb-core</artifactId>
             <version>${jaxb.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>dmaap-client</artifactId>
+            <version>${dmaap-clinet.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.sun.xml.bind</groupId>
             <artifactId>jaxb-impl</artifactId>
             <version>${mockserver.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.json</groupId>
+            <artifactId>json</artifactId>
+            <version>${json.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-params</artifactId>
index 6b5c157..248debe 100644 (file)
@@ -1,6 +1,7 @@
 /*-\r
  * ============LICENSE_START=======================================================\r
  *  Copyright (C) 2019 Nordix Foundation.\r
+ *  Copyright (C) 2021 Nokia.\r
  * ================================================================================\r
  * Licensed under the Apache License, Version 2.0 (the "License");\r
  * you may not use this file except in compliance with the License.\r
@@ -21,8 +22,7 @@
 package org.onap.dcaegen2.services.pmmapper.exceptions;\r
 \r
 public class MRPublisherException extends RuntimeException{\r
-    public MRPublisherException(String message, Throwable cause) {\r
-        super(message, cause);\r
+    public MRPublisherException(String message) {\r
+        super(message);\r
     }\r
-\r
 }\r
index 6aaf1d6..9e0b87c 100644 (file)
@@ -1,6 +1,7 @@
 /*-\r
  * ============LICENSE_START=======================================================\r
  *  Copyright (C) 2019 Nordix Foundation.\r
+ *  Copyright (C) 2021 Nokia.\r
  * ================================================================================\r
  * Licensed under the Apache License, Version 2.0 (the "License");\r
  * you may not use this file except in compliance with the License.\r
 \r
 package org.onap.dcaegen2.services.pmmapper.messagerouter;\r
 \r
-import java.nio.charset.StandardCharsets;\r
-import java.util.Base64;\r
-import java.util.List;\r
-\r
 import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;\r
 import org.onap.dcaegen2.services.pmmapper.model.Event;\r
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;\r
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;\r
+import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender;\r
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;\r
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;\r
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;\r
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;\r
 import org.onap.logging.ref.slf4j.ONAPLogAdapter;\r
 import org.slf4j.LoggerFactory;\r
-\r
 import reactor.core.publisher.Flux;\r
 \r
+import java.util.List;\r
+import java.util.stream.Collectors;\r
+\r
 public class VESPublisher {\r
     private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));\r
-    private RequestSender sender;\r
-    private MapperConfig config;\r
+    private final DmaapRequestSender sender;\r
+    private final MapperConfig config;\r
 \r
     public VESPublisher(MapperConfig config) {\r
-        this(config, new RequestSender());\r
+        this(config, new DmaapRequestSender());\r
     }\r
 \r
-    public VESPublisher(MapperConfig config, RequestSender sender) {\r
+    public VESPublisher(MapperConfig config, DmaapRequestSender sender) {\r
         this.sender = sender;\r
         this.config = config;\r
     }\r
 \r
     public Flux<Event> publish(List<Event> events) {\r
         logger.unwrap().info("Publishing VES events to messagerouter.");\r
-        Event event = events.get(0);\r
-        try {\r
-            events.forEach(e -> this.publish(e.getVes()));\r
-            logger.unwrap().info("Successfully published VES events to messagerouter.");\r
-        } catch (MRPublisherException e) {\r
-            logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", e);\r
-            return Flux.empty();\r
-        }\r
-        return Flux.just(event);\r
+        Event first = events.get(0);\r
+        List<String> vesEvents = minifiedVesEvents(events);\r
+        return publishEvents(vesEvents)\r
+                .filter(DmaapResponse::failed)\r
+                .takeLast(1)\r
+                .flatMap(this::toFluxError)\r
+                .defaultIfEmpty(first)\r
+                .doOnComplete(() -> logger.unwrap().info("Successfully published VES events to messagerouter."))\r
+                .onErrorResume(this::resume);\r
+    }\r
+\r
+    private List<String> minifiedVesEvents(List<Event> events) {\r
+        return events.stream()\r
+                .map(Event::getVes)\r
+                .map(vesEvent -> vesEvent.replace("\n", ""))\r
+                .collect(Collectors.toList());\r
+    }\r
+\r
+    private Flux<MessageRouterPublishResponse> publishEvents(List<String> vesEvents) {\r
+        String topicUrl = config.getPublisherTopicUrl();\r
+        AafCredentials credentials = aafCredentials();\r
+        return sender.send(topicUrl, vesEvents, credentials);\r
+    }\r
+\r
+    private Flux<Event> toFluxError(MessageRouterPublishResponse response) {\r
+        return Flux.error(new MRPublisherException(response.failReason()));\r
+    }\r
+\r
+    private Flux<Event> resume(Throwable t) {\r
+        logger.unwrap().error("Failed to publish VES event(s) to messagerouter.", t);\r
+        return Flux.empty();\r
     }\r
 \r
-    private void publish(String ves) {\r
-        try {\r
-            String topicUrl = config.getPublisherTopicUrl();\r
-            ves = ves.replaceAll("\n", "");\r
-            String userCredentials =  Base64.getEncoder()\r
-                .encodeToString((this.config.getPublisherUserName() + ":" +\r
-                    this.config.getPublisherPassword())\r
-                    .getBytes(StandardCharsets.UTF_8));\r
-            sender.send("POST", topicUrl, ves, userCredentials);\r
-        } catch (Exception e) {\r
-            throw new MRPublisherException(e.getMessage(), e);\r
-        }\r
+    private AafCredentials aafCredentials() {\r
+        return ImmutableAafCredentials.builder()\r
+                .username(config.getPublisherUserName())\r
+                .password(config.getPublisherPassword())\r
+                .build();\r
     }\r
 }\r
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSender.java
new file mode 100644 (file)
index 0000000..1a7c59e
--- /dev/null
@@ -0,0 +1,120 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import io.vavr.control.Try;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.MAX_RETRIES;
+import static org.onap.dcaegen2.services.pmmapper.utils.SendersConfig.RETRY_INTERVAL;
+
+public class DmaapRequestSender {
+    private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(DmaapRequestSender.class));
+
+    private static final DmaapRetryConfig RETRY_CONFIG = ImmutableDmaapRetryConfig.builder()
+            .retryCount(MAX_RETRIES)
+            .retryIntervalInSeconds((int) RETRY_INTERVAL.getSeconds())
+            .build();
+    private static final MessageRouterPublisherConfig CLIENT_CONFIGURATION =
+            ImmutableMessageRouterPublisherConfig.builder()
+                    .retryConfig(RETRY_CONFIG)
+                    .build();
+    private static final DmaapTimeoutConfig READ_TIMEOUT = ImmutableDmaapTimeoutConfig.builder()
+            .timeout(SendersConfig.READ_TIMEOUT)
+            .build();
+    private static final MessageRouterPublisher PUBLISHER =
+            DmaapClientFactory.createMessageRouterPublisher(CLIENT_CONFIGURATION);
+
+    /**
+     * Sends an http request to a given dmaap-mr topic.
+     *
+     * @param topicUrl    representing given topic
+     * @param vesEvents   of the requests as json
+     * @param credentials base64-encoded username password credentials
+     * @return dmaap-mr response
+     */
+    public Flux<MessageRouterPublishResponse> send(final String topicUrl, final List<String> vesEvents, final AafCredentials credentials) {
+        MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+                .contentType(ContentType.TEXT_PLAIN)
+                .sinkDefinition(sink(topicUrl, credentials))
+                .timeoutConfig(READ_TIMEOUT)
+                .diagnosticContext(diagnosticContext())
+                .build();
+
+        return PUBLISHER.put(request, jsonBatch(vesEvents));
+    }
+
+    private static MessageRouterSink sink(String topicUrl, AafCredentials credentials) {
+        return ImmutableMessageRouterSink.builder()
+                .aafCredentials(credentials)
+                .topicUrl(topicUrl)
+                .build();
+    }
+
+    private static RequestDiagnosticContext diagnosticContext() {
+        UUID invocationId = uuid(LoggingUtils.invocationID(LOGGER));
+        UUID requestId = uuid(LoggingUtils.requestID());
+        return ImmutableRequestDiagnosticContext.builder()
+                .invocationId(invocationId)
+                .requestId(requestId)
+                .build();
+    }
+
+    private static Flux<JsonElement> jsonBatch(List<String> events) {
+        return Flux.fromIterable(getAsJsonElements(events));
+    }
+
+    private static List<JsonElement> getAsJsonElements(List<String> events) {
+        return events.stream()
+                .map(JsonParser::parseString)
+                .collect(Collectors.toList());
+    }
+
+    private static UUID uuid(String s) {
+        return Try.of(() -> UUID.fromString(s))
+                .getOrElse(UUID::randomUUID);
+    }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/LoggingUtils.java
new file mode 100644 (file)
index 0000000..9e7fc8e
--- /dev/null
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.jboss.logging.MDC;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.onap.logging.ref.slf4j.ONAPLogConstants;
+
+import java.util.Optional;
+import java.util.UUID;
+
+final class LoggingUtils {
+
+    private LoggingUtils() {
+        throw new IllegalStateException("Utility class;shouldn't be constructed");
+    }
+
+    static String invocationID(ONAPLogAdapter logger) {
+        return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID))
+                .orElseGet(()-> logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString());
+    }
+
+    static String requestID() {
+        return Optional.ofNullable((String) MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))
+                .orElseGet(() -> UUID.randomUUID().toString());
+    }
+}
index 4993a10..3ab8ab6 100644 (file)
@@ -1,6 +1,7 @@
 /*-\r
  * ============LICENSE_START=======================================================\r
  *  Copyright (C) 2019 Nordix Foundation.\r
+ *  Copyright (C) 2021 Nokia.\r
  * ================================================================================\r
  * Licensed under the Apache License, Version 2.0 (the "License");\r
  * you may not use this file except in compliance with the License.\r
@@ -28,30 +29,23 @@ import java.net.HttpURLConnection;
 import java.net.URL;\r
 import java.nio.charset.StandardCharsets;\r
 import java.security.NoSuchAlgorithmException;\r
-import java.util.Optional;\r
-import java.util.UUID;\r
 import java.util.stream.Collectors;\r
 \r
 import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure;\r
 import org.onap.dcaegen2.services.pmmapper.exceptions.ServerResponseException;\r
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;\r
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;\r
 import org.onap.logging.ref.slf4j.ONAPLogAdapter;\r
 import org.onap.logging.ref.slf4j.ONAPLogConstants;\r
 import org.slf4j.LoggerFactory;\r
 \r
 import javax.net.ssl.HttpsURLConnection;\r
 import javax.net.ssl.SSLContext;\r
-import org.jboss.logging.MDC;\r
 \r
 public class RequestSender {\r
-    private static final int MAX_RETRIES = 5;\r
-    private static final int RETRY_INTERVAL = 1000;\r
     private static final String SERVER_ERROR_MESSAGE = "Error on Server";\r
     private static final int ERROR_START_RANGE = 300;\r
-    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));\r
-    public static final String DELETE = "DELETE";\r
-    public static final String DEFAULT_CONTENT_TYPE = "text/plain";\r
-    public static final int DEFAULT_READ_TIMEOUT = 5000;\r
+    private static final ONAPLogAdapter LOGGER = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));\r
 \r
     /**\r
      * Works just like {@link RequestSender#send(method,urlString)}, except {@code method }\r
@@ -91,20 +85,16 @@ public class RequestSender {
      */\r
     public String send(String method, final String urlString, final String body, final String encodedCredentials)\r
              throws InterruptedException {\r
-        String invocationID = Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.INVOCATION_ID))\r
-                 .orElse(logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS).toString());\r
-        String requestID =  Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))\r
-                .orElse(UUID.randomUUID().toString());\r
         String result = "";\r
         boolean status = false;\r
-        int attempts = 1;\r
+        int attempts = 0;\r
         try {\r
-            while (!status && attempts <= MAX_RETRIES) {\r
-                if(attempts != 1) {\r
-                    Thread.sleep(RETRY_INTERVAL);\r
+            while (!status && attempts <= SendersConfig.MAX_RETRIES) {\r
+                if(attempts != 0) {\r
+                    Thread.sleep(SendersConfig.RETRY_INTERVAL.toMillis());\r
                 }\r
                 final URL url = new URL(urlString);\r
-                final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);\r
+                final HttpURLConnection connection = getHttpURLConnection(method, url);\r
 \r
                 if ("https".equalsIgnoreCase(url.getProtocol())) {\r
                     HttpsURLConnection.setDefaultSSLSocketFactory(SSLContext.getDefault().getSocketFactory());\r
@@ -122,26 +112,25 @@ public class RequestSender {
                 attempts++;\r
             }\r
         } catch (IOException | NoSuchAlgorithmException ex) {\r
-            logger.unwrap().warn("Request failure", ex);\r
+            LOGGER.unwrap().warn("Request failure", ex);\r
             throw new RequestFailure(ex);\r
         }\r
         return result;\r
     }\r
 \r
-    private HttpURLConnection getHttpURLConnection(String method, URL url, String invocationID, String requestID)\r
+    private HttpURLConnection getHttpURLConnection(String method, URL url)\r
                 throws IOException {\r
         HttpURLConnection connection = (HttpURLConnection) url.openConnection();\r
-        connection.setReadTimeout(DEFAULT_READ_TIMEOUT);\r
-        connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID);\r
-        connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID);\r
+        connection.setReadTimeout((int) SendersConfig.READ_TIMEOUT.toMillis());\r
+        connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, LoggingUtils.requestID());\r
+        connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, LoggingUtils.invocationID(LOGGER));\r
         connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
         connection.setRequestMethod(method);\r
-\r
         return connection;\r
     }\r
 \r
     private void setMessageBody(HttpURLConnection connection, String body) throws IOException {\r
-        connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);\r
+        connection.setRequestProperty("Content-Type", ContentType.TEXT_PLAIN.toString());\r
         connection.setDoOutput(true);\r
         OutputStream outputStream = connection.getOutputStream();\r
         outputStream.write(body.getBytes(StandardCharsets.UTF_8));\r
@@ -150,7 +139,7 @@ public class RequestSender {
     }\r
 \r
     private boolean retryLimitReached(final int retryCount) {\r
-        return retryCount >= MAX_RETRIES;\r
+        return retryCount >= SendersConfig.MAX_RETRIES;\r
     }\r
 \r
     private boolean isWithinErrorRange(final int responseCode) {\r
@@ -158,18 +147,18 @@ public class RequestSender {
     }\r
 \r
     private String getResult(int attemptNumber, HttpURLConnection connection) throws IOException {\r
-        logger.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL());\r
+        LOGGER.unwrap().info("Sending {} request to {}.", connection.getRequestMethod(), connection.getURL());\r
         String result = "";\r
         try (InputStream is = connection.getInputStream();\r
                 BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {\r
             result = reader.lines().collect(Collectors.joining("\n"));\r
             int responseCode = connection.getResponseCode();\r
             if (!(isWithinErrorRange(responseCode))) {\r
-                logger.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result);\r
+                LOGGER.unwrap().info("Response code: {}, Server Response Received:\n{}", responseCode, result);\r
             }\r
         } catch (Exception e) {\r
             if (retryLimitReached(attemptNumber)) {\r
-                logger.unwrap().error("Execution error: {}", connection.getResponseMessage(), e);\r
+                LOGGER.unwrap().error("Execution error: {}", connection.getResponseMessage(), e);\r
                 throw new ServerResponseException(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);\r
             }\r
         }\r
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/SendersConfig.java
new file mode 100644 (file)
index 0000000..c0c972d
--- /dev/null
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import java.time.Duration;
+
+final class SendersConfig {
+
+    private SendersConfig() {
+        throw new IllegalStateException("SendersConfig class;shouldn't be constructed");
+    }
+
+    static final int MAX_RETRIES = 4;
+    static final Duration RETRY_INTERVAL = Duration.ofSeconds(1);
+    static final Duration READ_TIMEOUT = Duration.ofSeconds(5);
+}
index e5c5af4..47e09e9 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 Nordix Foundation.
+ *  Copyright (C) 2021 Nokia.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * ============LICENSE_END=========================================================
  */
 package org.onap.dcaegen2.pmmapper.messagerouter;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.onap.dcaegen2.services.pmmapper.exceptions.RequestFailure;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import reactor.test.StepVerifier;
-import java.util.Arrays;
-import java.util.List;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
-import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.DmaapRequestSender;
+import org.onap.dcaegen2.services.pmmapper.utils.EnvironmentConfig;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(EnvironmentConfig.class)
 @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
 public class VESPublisherTest {
 
-    private static String topicURL = "http://mr/topic";
-    private static RequestSender sender;
-    private static MapperConfig config;
+    private static final String TOPIC_URL = "http://mr/topic";
+    private static final String VES = "{}";
+    private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder()
+            .username("")
+            .password("")
+            .build();
+
+    private static final MessageRouterPublishResponse SUCCESSFUL =
+            ImmutableMessageRouterPublishResponse.builder().build();
+    private static final MessageRouterPublishResponse FAILED =
+            ImmutableMessageRouterPublishResponse.builder()
+                    .failReason("failReason")
+                    .build();
+
+    private DmaapRequestSender sender;
     private VESPublisher sut;
-    private String ves = "{}";
 
     @Before
-    public void before() throws Exception {
-        config = mock(MapperConfig.class);
-        sender = mock(RequestSender.class);
+    public void before() {
+        MapperConfig config = mock(MapperConfig.class);
+        when(config.getPublisherTopicUrl()).thenReturn(TOPIC_URL);
+        when(config.getPublisherUserName()).thenReturn("");
+        when(config.getPublisherPassword()).thenReturn("");
+        sender = mock(DmaapRequestSender.class);
         sut = new VESPublisher(config, sender);
-        when(config.getPublisherTopicUrl()).thenReturn(topicURL);
     }
 
     @Test
-    public void publish_multiple_success() throws Exception {
+    public void publish_multiple_success() {
+        Event event = mock(Event.class);
+        List<Event> events = Arrays.asList(event, event, event);
+        when(event.getVes()).thenReturn(VES);
+        MessageRouterPublishResponse successfulResponse = ImmutableMessageRouterPublishResponse.builder().build();
+        when(sender.send(any(), any(), any())).thenReturn(Flux.just(successfulResponse, successfulResponse));
+
+        Flux<Event> flux = sut.publish(events);
+
+        verify(sender, times(1)).send(anyString(), any(), any());
+        StepVerifier.create(flux)
+                .expectNextMatches(event::equals)
+                .verifyComplete();
+    }
+
+    @Test
+    public void publish_multiple_fail_sender_exceptions() {
+        Event event = mock(Event.class);
+        List<Event> events = Arrays.asList(event, event, event);
+        when(event.getVes()).thenReturn(VES);
+        when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+                .thenReturn(Flux.error(new RuntimeException()));
+
+        Flux<Event> flux = sut.publish(events);
+
+        StepVerifier.create(flux)
+                .verifyComplete();
+    }
+
+    @Test
+    public void publish_multiple_fail_mr_responses_failed() {
         Event event = mock(Event.class);
-        List<Event> events  = Arrays.asList(event,event,event);
-        when(event.getVes()).thenReturn(ves);
+        List<Event> events = Arrays.asList(event, event, event);
+        when(event.getVes()).thenReturn(VES);
+        when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+                .thenReturn(Flux.just(FAILED, FAILED, FAILED));
 
         Flux<Event> flux = sut.publish(events);
 
-        verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
         StepVerifier.create(flux)
-            .expectNextMatches(event::equals)
-            .expectComplete()
-            .verify();
+                .verifyComplete();
     }
 
     @Test
-    public void publish_multiple_fail() throws Exception {
+    public void publish_multiple_fail_and_multiple_success() {
         Event event = mock(Event.class);
-        List<Event> events  = Arrays.asList(event,event,event);
-        when(event.getVes()).thenReturn(ves);
-        when(sender.send("POST",topicURL,ves,"base64encoded")).thenThrow(RequestFailure.class);
+        when(event.getVes()).thenReturn(VES);
+        List<Event> events = Arrays.asList(event, event, event, event);
+        when(sender.send(eq(TOPIC_URL), any(), eq(AAF_CREDENTIALS)))
+                .thenReturn(Flux.just(SUCCESSFUL, FAILED, SUCCESSFUL, FAILED));
 
         Flux<Event> flux = sut.publish(events);
 
         StepVerifier.create(flux)
-        .expectNext(events.get(0))
-            .verifyComplete();
+                .verifyComplete();
     }
 }
index db45029..617cbd1 100644 (file)
@@ -62,7 +62,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockserver.client.server.MockServerClient;
+import org.mockserver.client.MockServerClient;
 import org.mockserver.integration.ClientAndServer;
 import org.mockserver.model.HttpRequest;
 import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
index 7ddc929..26cb648 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019-2020 Nordix Foundation.
+ *  Copyright (C) 2021 Nokia.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -42,6 +43,7 @@ import java.util.List;
 
 import org.everit.json.schema.Schema;
 import org.everit.json.schema.loader.SchemaLoader;
+import org.json.JSONException;
 import org.json.JSONObject;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -79,7 +81,7 @@ class MapperTest {
 
 
     @BeforeAll
-    static void classSetup() throws IOException {
+    static void classSetup() throws IOException, JSONException {
         JSONObject ves = new JSONObject(new String(Files.readAllBytes(schema)));
         vesSchema = SchemaLoader.load(ves);
 
@@ -97,7 +99,7 @@ class MapperTest {
 
     @ParameterizedTest
     @MethodSource("getValidEvents")
-    void testValidEvent(Event testEvent) {
+    void testValidEvent(Event testEvent) throws JSONException {
         vesSchema.validate(new JSONObject(objUnderTest.map(testEvent)));
     }
 
index 0451543..1c6d850 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019 - 2020 Nordix Foundation.
+ *  Copyright (C) 2021 Nokia.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -65,6 +66,7 @@ public class DataRouterUtilsTest {
     private static MapperConfig validConfig;
     private SSLContextFactory sslContextFactory;
     private static final Path validConfigPath = Paths.get("src/test/resources/valid_mapper_config.json");
+    private static final String DELETE = "DELETE";
 
     @Test
     public void processEventSuccessful() throws Exception {
@@ -94,7 +96,7 @@ public class DataRouterUtilsTest {
 
         Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
         assertEquals(serviceResponse,  DataRouterUtils.processEvent(mockMapperConfig, testEvent));
-        verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE);
+        verify(mockConnection, times(1)).setRequestMethod(DELETE);
     }
 
     @Test
@@ -117,7 +119,7 @@ public class DataRouterUtilsTest {
 
         Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
         assertEquals(serviceResponse,  DataRouterUtils.processEvent(mockMapperConfig, testEvent));
-        verify(mockConnection, times(1)).setRequestMethod(RequestSender.DELETE);
+        verify(mockConnection, times(1)).setRequestMethod(DELETE);
     }
 
     @Test
@@ -148,7 +150,7 @@ public class DataRouterUtilsTest {
         PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
         Event testEvent = EventUtils.makeMockEvent("", mock(EventMetadata.class), publishIdentity);
         assertEquals(serviceResponse, DataRouterUtils.processEvent(mockMapperConfig, testEvent));
-        verify(mockConnection, times(5)).setRequestMethod(RequestSender.DELETE);
+        verify(mockConnection, times(5)).setRequestMethod(DELETE);
     }
 
     @Test
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DmaapRequestSenderTest.java
new file mode 100644 (file)
index 0000000..50abb5f
--- /dev/null
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2021 Nokia.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpStatusCode;
+import org.mockserver.verify.VerificationTimes;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class DmaapRequestSenderTest {
+
+    private static final ImmutableAafCredentials AAF_CREDENTIALS = ImmutableAafCredentials.builder()
+            .username("")
+            .password("")
+            .build();
+    private static final List<String> SINGLE = Collections.singletonList("any");
+
+    private static ClientAndServer mockServer;
+    private final MockServerClient client = mockClient();
+
+    @BeforeClass
+    public static void setup() {
+        mockServer = startClientAndServer(35454);
+    }
+
+    @AfterClass
+    public static void teardown() {
+        mockServer.stop();
+    }
+
+    @Before
+    public void setUp() {
+        client.reset();
+    }
+
+    @Test
+    public void send_success() {
+        client.when(request()).respond(response()
+                .withStatusCode(HttpStatusCode.OK_200.code())
+                .withBody("ResponseBody"));
+
+        Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+                .send("http://127.0.0.1:35454/once", SINGLE, AAF_CREDENTIALS);
+
+        StepVerifier.create(result)
+                .expectNextMatches(DmaapResponse::successful)
+                .verifyComplete();
+        client.verify(request(), VerificationTimes.once());
+    }
+
+    @Test
+    public void host_unavailable_retry_mechanism() {
+        client.when(request())
+                .respond(response().withStatusCode(HttpStatusCode.SERVICE_UNAVAILABLE_503.code()));
+
+        Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+                .send("http://127.0.0.1:35454/anypath", SINGLE, AAF_CREDENTIALS);
+
+        StepVerifier.create(result)
+                .expectNextMatches(DmaapResponse::failed)
+                .verifyComplete();
+        client.verify(request(), VerificationTimes.exactly(5));
+    }
+
+    @Test
+    public void host_unknown() {
+        Flux<MessageRouterPublishResponse> result = new DmaapRequestSender()
+                .send("http://unknown-host:35454/host-is-unknown", SINGLE, AAF_CREDENTIALS);
+
+        StepVerifier.create(result)
+                .verifyError();
+        client.verify(request(), VerificationTimes.exactly(0));
+    }
+
+    private MockServerClient mockClient() {
+        return new MockServerClient("127.0.0.1", 35454);
+    }
+}
index 08faf4a..8541824 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019-2020 Nordix Foundation.
+ *  Copyright (C) 2021 Nokia.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,15 +31,15 @@ import java.net.URL;
 import java.net.UnknownHostException;
 
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockserver.client.server.MockServerClient;
+import org.mockserver.client.MockServerClient;
 import org.mockserver.integration.ClientAndServer;
 import org.mockserver.model.HttpRequest;
 import org.mockserver.model.HttpStatusCode;
 import org.mockserver.verify.VerificationTimes;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
 import org.onap.logging.ref.slf4j.ONAPLogConstants;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -53,7 +54,7 @@ import utils.LoggingUtils;
 @PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*"})
 public class RequestSenderTests {
     private static ClientAndServer mockServer;
-    private MockServerClient client = mockClient();
+    private final MockServerClient client = mockClient();
 
     @BeforeClass
     public static void setup() {
@@ -65,6 +66,11 @@ public class RequestSenderTests {
         mockServer.stop();
     }
 
+    @Before
+    public void setUp() {
+        client.reset();
+    }
+
     @Test
     public void send_success() throws Exception {
         String url = "http://127.0.0.1:35454/once";
@@ -84,11 +90,10 @@ public class RequestSenderTests {
         assertTrue(logAppender.list.get(1).getMessage().contains("Sending"));
         assertTrue(logAppender.list.get(2).getMessage().contains("Received"));
         logAppender.stop();
-        client.clear(req);
     }
 
     @Test
-    public void host_unavailable_retry_mechanism() throws Exception {
+    public void host_unavailable_retry_mechanism() {
         PowerMockito.mockStatic(Thread.class);
 
         client.when(request())
@@ -99,7 +104,6 @@ public class RequestSenderTests {
         });
 
         client.verify(request(), VerificationTimes.exactly(5));
-        client.clear(request());
     }
 
     @Test
@@ -114,11 +118,10 @@ public class RequestSenderTests {
         });
 
         client.verify(request(), VerificationTimes.exactly(0));
-        client.clear(request());
     }
 
     private MockServerClient mockClient() {
         return new MockServerClient("127.0.0.1", 35454);
     }
 
-}
\ No newline at end of file
+}