Use correct Content Type from DMaaP 41/68441/2
authorelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 19 Sep 2018 06:49:28 +0000 (08:49 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Fri, 21 Sep 2018 14:46:38 +0000 (16:46 +0200)
Add the Content Type to the header the DMaaP consumer uses to get the fileReady event from DMAaaP to prevent formatting problems.

Change-Id: Iedf38b7542e5709a78f383d31c75e7b95aa56cfe
Issue-ID: DCAEGEN2-825
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClient.java
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/consumer/DmaapConsumerReactiveHttpClientTest.java

index 72e7d49..e4afd3a 100644 (file)
@@ -63,11 +63,11 @@ public class DmaapConsumerJsonParser {
     /**
      * Extract info from string and create @see {@link FileData}.
      *
-     * @param monoMessage - results from DMaaP
+     * @param rawMessage - results from DMaaP
      * @return reactive Mono with an array of FileData
      */
-    public Mono<List<FileData>> getJsonObject(Mono<String> monoMessage) {
-        return monoMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+    public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) {
+        return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
     }
 
     private Mono<JsonElement> getJsonParserMessage(String message) {
index ad9e6fe..f32b22c 100644 (file)
@@ -2,31 +2,28 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.service.consumer;
 
 import java.net.URI;
-import java.net.URISyntaxException;
+import java.util.function.Consumer;
 
-import org.apache.http.client.utils.URIBuilder;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.util.DefaultUriBuilderFactory;
 
 import reactor.core.publisher.Mono;
 
@@ -36,8 +33,6 @@ import reactor.core.publisher.Mono;
  */
 public class DmaapConsumerReactiveHttpClient {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
     private WebClient webClient;
     private final String dmaapHostName;
     private final String dmaapProtocol;
@@ -45,6 +40,7 @@ public class DmaapConsumerReactiveHttpClient {
     private final String dmaapTopicName;
     private final String consumerGroup;
     private final String consumerId;
+    private final String contentType;
 
     /**
      * Constructor of DmaapConsumerReactiveHttpClient.
@@ -58,6 +54,7 @@ public class DmaapConsumerReactiveHttpClient {
         this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
         this.consumerGroup = consumerConfiguration.consumerGroup();
         this.consumerId = consumerConfiguration.consumerId();
+        this.contentType = consumerConfiguration.dmaapContentType();
     }
 
     /**
@@ -66,21 +63,16 @@ public class DmaapConsumerReactiveHttpClient {
      * @return reactive response from DMaaP in string format
      */
     public Mono<String> getDmaapConsumerResponse() {
-        try {
-            return webClient
-                .get()
-                .uri(getUri())
-                .retrieve()
-                .onStatus(HttpStatus::is4xxClientError, clientResponse ->
-                    Mono.error(new Exception("HTTP 400"))
-                )
-                .onStatus(HttpStatus::is5xxServerError, clientResponse ->
-                    Mono.error(new Exception("HTTP 500")))
+        return webClient.get().uri(getUri()).headers(getHeaders()).retrieve()
+                .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400")))
+                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
                 .bodyToMono(String.class);
-        } catch (URISyntaxException e) {
-            logger.error("Unable to parse URI in message from xNF.", e);
-            return Mono.error(e);
-        }
+    }
+
+    private Consumer<HttpHeaders> getHeaders() {
+        return httpHeaders -> {
+            httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
+        };
     }
 
     private String createRequestPath() {
@@ -92,8 +84,8 @@ public class DmaapConsumerReactiveHttpClient {
         return this;
     }
 
-    URI getUri() throws URISyntaxException {
-        return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
-            .setPath(createRequestPath()).build();
+    URI getUri() {
+        return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+                .path(createRequestPath()).build();
     }
 }
index 4f96a90..4568bdd 100644 (file)
@@ -51,8 +51,8 @@ class DmaapConsumerReactiveHttpClientTest {
     private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
     private Mono<String> expectedResult = Mono.empty();
     private WebClient webClient;
-    private RequestHeadersUriSpec requestHeadersSpec;
-    private ResponseSpec responseSpec;
+    private RequestHeadersUriSpec requestHeadersSpecMock;
+    private ResponseSpec responseSpecMock;
 
 
     @BeforeEach
@@ -73,8 +73,8 @@ class DmaapConsumerReactiveHttpClientTest {
             .filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
                 consumerConfigurationMock.dmaapUserPassword()))
             .build());
-        requestHeadersSpec = mock(RequestHeadersUriSpec.class);
-        responseSpec = mock(ResponseSpec.class);
+        requestHeadersSpecMock = mock(RequestHeadersUriSpec.class);
+        responseSpecMock = mock(ResponseSpec.class);
     }
 
 
@@ -85,8 +85,9 @@ class DmaapConsumerReactiveHttpClientTest {
 
         //when
         mockDependantObjects();
-        doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
+        doReturn(expectedResult).when(responseSpecMock).bodyToMono(String.class);
         dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
+
         Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse();
 
         //then
@@ -98,24 +99,17 @@ class DmaapConsumerReactiveHttpClientTest {
     }
 
     @Test
-    void getHttpResponse_whenUriSyntaxExceptionHasBeenThrown() throws URISyntaxException {
-        //given
-        dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient);
-        //when
-        when(webClient.get()).thenReturn(requestHeadersSpec);
-        dmaapConsumerReactiveHttpClient.createDmaapWebClient(webClient);
-        when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
-
-        //then
-        StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).expectSubscription()
-            .expectError(Exception.class).verify();
+    void getAppropriateUri_whenPassingCorrectedUriData() throws URISyntaxException {
+        Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
+            URI.create("https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDCAE-c12/c12"));
     }
 
     private void mockDependantObjects() {
-        when(webClient.get()).thenReturn(requestHeadersSpec);
-        when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
-        when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
-        doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+        when(webClient.get()).thenReturn(requestHeadersSpecMock);
+        when(requestHeadersSpecMock.uri((URI) any())).thenReturn(requestHeadersSpecMock);
+        when(requestHeadersSpecMock.headers(any())).thenReturn(requestHeadersSpecMock);
+        when(requestHeadersSpecMock.retrieve()).thenReturn(responseSpecMock);
+        doReturn(responseSpecMock).when(responseSpecMock).onStatus(any(), any());
     }
 
 }
\ No newline at end of file