Fix minor bugs in ves publisher 05/85505/7 1.0.0
authoremartin <ephraim.martin@est.tech>
Tue, 23 Apr 2019 14:49:01 +0000 (14:49 +0000)
committeremartin <ephraim.martin@est.tech>
Tue, 23 Apr 2019 14:49:01 +0000 (14:49 +0000)
* Publisher now uses existing RequestID
* Introduce basicAuth for publisher
* Clean up ves string by removing new lines
* Update pm-mapper topic name in blueprint
* Update reconfigure script to use https

Change-Id: Ib743127b04077b063c73415b1475be83e200ec3b
Issue-ID: DCAEGEN2-1444
Signed-off-by: emartin <ephraim.martin@est.tech>
dpo/blueprints/k8s-pm-mapper.yaml
src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
src/main/resources/reconfigure.sh
src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java

index ba8182a..eabe51e 100644 (file)
@@ -155,7 +155,7 @@ node_templates:
                 get_input: client_id
               topic_url:
                 { concat: [{ get_input: dmaap_mr_service_protocol },"://",{ get_input: dmaap_mr_service_host },
-                           ":",{ get_input: dmaap_mr_service_port },"/events/PM_MAPPER"]}
+                           ":",{ get_input: dmaap_mr_service_port },"/events/org.onap.dmaap.mr.PM_MAPPER"]}
               location:
                 get_input: dcae_location
       docker_config:
index 77b0545..46d40e4 100644 (file)
@@ -20,6 +20,8 @@
 \r
 package org.onap.dcaegen2.services.pmmapper.messagerouter;\r
 \r
+import java.nio.charset.StandardCharsets;\r
+import java.util.Base64;\r
 import java.util.List;\r
 \r
 import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;\r
@@ -61,7 +63,13 @@ public class VESPublisher {
     private void publish(String ves) {\r
         try {\r
             String topicUrl = config.getPublisherTopicUrl();\r
-            sender.send("POST", topicUrl, ves);\r
+            ves = ves.replaceAll("\n", "");\r
+            String userCredentials =  topicUrl.startsWith("https") ? Base64.getEncoder()\r
+                .encodeToString((this.config.getPublisherUserName() + ":" +\r
+                    this.config.getPublisherPassword())\r
+                    .getBytes(StandardCharsets.UTF_8))\r
+                : "";\r
+            sender.send("POST", topicUrl, ves, userCredentials);\r
         } catch (Exception e) {\r
             throw new MRPublisherException(e.getMessage(), e);\r
         }\r
index 390fa0d..0630d53 100644 (file)
@@ -118,6 +118,12 @@ public class MapperConfig implements Configurable{
         @GSONRequired
         @SerializedName("dmaap_info")
         DmaapInfo dmaapInfo;
+
+        @SerializedName("aaf_username")
+        private String aafUsername;
+
+        @SerializedName("aaf_password")
+        private String aafPassword;
     }
 
     @Getter
@@ -133,12 +139,6 @@ public class MapperConfig implements Configurable{
         @SerializedName("subscriber_id")
         private String subscriberId;
 
-        @SerializedName("aaf_username")
-        private String aafUsername;
-
-        @SerializedName("aaf_password")
-        private String aafPassword;
-
         @SerializedName("client_role")
         private String clientRole;
 
@@ -149,6 +149,14 @@ public class MapperConfig implements Configurable{
         private String topicUrl;
     }
 
+    public String getPublisherUserName() {
+        return this.getStreamsPublishes().getDmaapPublisher().getAafUsername();
+    }
+
+    public String getPublisherPassword() {
+        return this.getStreamsPublishes().getDmaapPublisher().getAafPassword();
+    }
+
     @Override
     public void reconfigure(MapperConfig mapperConfig) {
         if(!this.equals(mapperConfig)) {
index fdbae59..c8e29f3 100644 (file)
@@ -27,6 +27,8 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;\r
 import java.net.URL;\r
 import java.nio.charset.StandardCharsets;\r
+import java.util.Base64;\r
+import java.util.Optional;\r
 import java.util.UUID;\r
 import java.util.stream.Collectors;\r
 \r
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
 \r
 import javax.net.ssl.HttpsURLConnection;\r
 import javax.net.ssl.SSLContext;\r
+import org.jboss.logging.MDC;\r
 \r
 public class RequestSender {\r
     private static final int MAX_RETRIES = 5;\r
@@ -67,17 +70,28 @@ public class RequestSender {
        return send(method,urlString,"");\r
     }\r
 \r
+    /**\r
+     * Works just like {@link RequestSender#send(method,urlString,body, basicAuth)}, except {@code basicAuth }\r
+     * is set to empty String by default.\r
+     * @see RequestSender#send(String,String,String,String)\r
+     */\r
+    public String send(String method, final String urlString, final String body) throws Exception {\r
+        return send(method,urlString,body,"");\r
+    }\r
+\r
     /**\r
      * Sends an http request to a given endpoint.\r
      * @param method of the outbound request\r
      * @param urlString representing given endpoint\r
      * @param body of the request as json\r
+     * @param encodedCredentials base64-encoded username password credentials\r
      * @return http response body\r
      * @throws Exception\r
      */\r
-    public String send(String method, final String urlString, final String body) throws Exception {\r
+    public String send(String method, final String urlString, final String body, final String encodedCredentials) throws Exception {\r
         final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);\r
-        final UUID requestID = UUID.randomUUID();\r
+        String requestID =  Optional.ofNullable((String)MDC.get(ONAPLogConstants.MDCs.REQUEST_ID))\r
+            .orElse( UUID.randomUUID().toString());\r
         String result = "";\r
 \r
         for (int i = 1; i <= MAX_RETRIES; i++) {\r
@@ -89,6 +103,10 @@ public class RequestSender {
                 HttpsURLConnection.setDefaultSSLSocketFactory(SSLContext.getDefault().getSocketFactory());\r
             }\r
 \r
+            if(!encodedCredentials.isEmpty()) {\r
+                connection.setRequestProperty("Authorization", "Basic " + encodedCredentials);\r
+            }\r
+\r
             if(!body.isEmpty()) {\r
                 setMessageBody(connection, body);\r
             }\r
@@ -116,10 +134,10 @@ public class RequestSender {
         return result;\r
     }\r
 \r
-    private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, UUID requestID) throws IOException {\r
+    private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, String requestID) throws IOException {\r
         HttpURLConnection connection = (HttpURLConnection) url.openConnection();\r
         connection.setReadTimeout(DEFAULT_READ_TIMEOUT);\r
-        connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());\r
+        connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID);\r
         connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());\r
         connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);\r
         connection.setRequestMethod(method);\r
index ac6f940..79aea06 100644 (file)
@@ -2,5 +2,5 @@
 while true
 do
     sleep 60
-    echo $(wget -S --spider localhost:8081/reconfigure 2>&1) >> /var/log/ONAP/dcaegen2/services/pm-mapper/reconfigure.log
+    echo $(wget -S --spider --no-check-certificate https://localhost:8443/reconfigure 2>&1) >> /var/log/ONAP/dcaegen2/services/pm-mapper/reconfigure.log
 done
index 69d34f8..2244d2d 100644 (file)
@@ -67,7 +67,7 @@ public class VESPublisherTest {
 \r
         Flux<Event> flux = sut.publish(events);\r
 \r
-        verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString());\r
+        verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString(), Mockito.anyString());\r
         StepVerifier.create(flux)\r
             .expectNextMatches(event::equals)\r
             .expectComplete()\r
@@ -79,11 +79,12 @@ public class VESPublisherTest {
         Event event = mock(Event.class);\r
         List<Event> events  = Arrays.asList(event,event,event);\r
         when(event.getVes()).thenReturn(ves);\r
-        when(sender.send("POST",topicURL,ves)).thenThrow(Exception.class);\r
+        when(sender.send("POST",topicURL,ves,"base64encoded")).thenThrow(Exception.class);\r
 \r
         Flux<Event> flux = sut.publish(events);\r
 \r
         StepVerifier.create(flux)\r
+        .expectNext(events.get(0))\r
             .verifyComplete();\r
     }\r
 }\r