Fix delivery to DataRouter 23/69123/5
authorelinuxhenrik <henrik.b.andersson@est.tech>
Wed, 26 Sep 2018 14:03:45 +0000 (16:03 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Thu, 27 Sep 2018 12:30:56 +0000 (14:30 +0200)
The messages to the DataRouter was not actually sent.

Change-Id: I5748ee0cc19a5049ca4d965caefb5cdf2204419f
Issue-ID: DCAEGEN2-841
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
21 files changed:
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java [moved from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java with 62% similarity]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java [deleted file]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java [new file with mode: 0644]
datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelForUnitTest.java
datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java

index a3f7582..89f5bbf 100644 (file)
@@ -17,7 +17,6 @@
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -44,6 +43,8 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
     private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
 
     public boolean collectFile(FileServerData fileServerData, String remoteFile, String localFile) {
+        logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData,
+                remoteFile, localFile);
         boolean result = true;
         try {
             FTPSClient ftps = new FTPSClient("TLS");
@@ -56,9 +57,10 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
                 closeDownConnection(ftps);
             }
         } catch (IOException ex) {
-            logger.error("Unable to collect file from xNF. " + fileServerData, ex);
+            logger.error("Unable to collect file from xNF. Data: {}", fileServerData, ex);
             result = false;
         }
+        logger.trace("collectFile left with result: {}", result);
         return result;
     }
 
@@ -71,7 +73,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
 
             if (!ftps.login(fileServerData.userId(), fileServerData.password())) {
                 ftps.logout();
-                logger.error("Unable to log in to xNF. " + fileServerData);
+                logger.error("Unable to log in to xNF. {}", fileServerData);
                 success = false;
             }
 
@@ -79,13 +81,13 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
                 int reply = ftps.getReplyCode();
                 if (!FTPReply.isPositiveCompletion(reply)) {
                     ftps.disconnect();
-                    logger.error("Unable to connect in to xNF. " + fileServerData);
+                    logger.error("Unable to connect in to xNF. {}", fileServerData);
                     success = false;
                 }
                 ftps.enterLocalPassiveMode();
             }
         } catch (Exception ex) {
-            logger.error("Unable to connect to xNF." + fileServerData, ex);
+            logger.error("Unable to connect to xNF. Data: {}", fileServerData, ex);
             success = false;
         }
 
@@ -93,7 +95,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
     }
 
     private void getFile(String remoteFile, String localFile, FTPSClient ftps)
-            throws IOException, FileNotFoundException {
+            throws IOException {
         OutputStream output;
         File outfile = new File(localFile);
         outfile.createNewFile();
@@ -103,7 +105,7 @@ public class FtpsClient { // TODO: Should be final but needs PowerMock or Mockit
         ftps.retrieveFile(remoteFile, output);
 
         output.close();
-        logger.debug("File " + outfile.getName() + " Download Successfull from xNF");
+        logger.debug("File {} Download Successfull from xNF", outfile.getName());
     }
 
     private void closeDownConnection(FTPSClient ftps) {
index e4afd3a..7226dfa 100644 (file)
@@ -28,12 +28,13 @@ import java.util.stream.StreamSupport;
 
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.util.StringUtils;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -66,8 +67,8 @@ public class DmaapConsumerJsonParser {
      * @param rawMessage - results from DMaaP
      * @return reactive Mono with an array of FileData
      */
-    public Mono<List<FileData>> getJsonObject(Mono<String> rawMessage) {
-        return rawMessage.flatMap(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
+    public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
+        return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
     }
 
     private Mono<JsonElement> getJsonParserMessage(String message) {
@@ -75,12 +76,12 @@ public class DmaapConsumerJsonParser {
             : Mono.fromSupplier(() -> new JsonParser().parse(message));
     }
 
-    private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) {
+    private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
         return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
             : getFileDataFromJsonArray(jsonElement);
     }
 
-    private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) {
+    private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
         return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
             .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
     }
@@ -89,13 +90,13 @@ public class DmaapConsumerJsonParser {
         return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
     }
 
-    private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) {
-        return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP)
-            ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+    private Flux<FileData> create(Mono<JsonObject> jsonObject) {
+        return jsonObject.flatMapMany(monoJsonP -> !containsHeader(monoJsonP)
+            ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
             : transform(monoJsonP));
     }
 
-    private Mono<List<FileData>> transform(JsonObject jsonObject) {
+    private Flux<FileData> transform(JsonObject jsonObject) {
         if (containsHeader(jsonObject, EVENT, NOTIFICATION_FIELDS)) {
             JsonObject notificationFields = jsonObject.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
             String changeIdentifier = getValueFromJson(notificationFields, CHANGE_IDENTIFIER);
@@ -105,26 +106,25 @@ public class DmaapConsumerJsonParser {
 
             if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
                 && arrayOfNamedHashMap != null) {
-                Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
-                return res;
+                return getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
             }
 
             if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
-                return Mono.error(
+                return Flux.error(
                     new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
             } else if (arrayOfNamedHashMap != null) {
-                return Mono.error(
+                return Flux.error(
                     new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
             }
-            return Mono.error(
+            return Flux.error(
                 new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
         }
-        return Mono.error(
+        return Flux.error(
             new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
 
     }
 
-    private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType,
+    private Flux<FileData> getAllFileDataFromJson(String changeIdentifier, String changeType,
         JsonArray arrayOfAdditionalFields) {
         List<FileData> res = new ArrayList<>();
         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
@@ -135,11 +135,11 @@ public class DmaapConsumerJsonParser {
                 if (fileData != null) {
                     res.add(fileData);
                 } else {
-                    logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo);
+                    logger.error("Unable to collect file from xNF. File information wrong. Data: {}", fileInfo);
                 }
             }
         }
-        return Mono.just(res);
+        return Flux.fromIterable(res);
     }
 
     private FileData getFileDataFromJson(JsonObject fileInfo, String changeIdentifier, String changeType) {
@@ -154,7 +154,7 @@ public class DmaapConsumerJsonParser {
 
         if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
             && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
-            fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType)
+            fileData = ImmutableFileData.builder().name(name).changeIdentifier(changeIdentifier).changeType(changeType)
                 .location(location).compression(compression).fileFormatType(fileFormatType)
                 .fileFormatVersion(fileFormatVersion).build();
         }
index 0c76fc1..32fdbdc 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import java.util.List;
-
 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
 import org.springframework.web.reactive.function.client.WebClient;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -37,7 +34,7 @@ import reactor.core.publisher.Mono;
  */
 abstract class DmaapConsumerTask {
 
-    abstract Mono<List<FileData>> consume(Mono<String> message) throws DmaapNotFoundException;
+    abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
 
     abstract DmaapConsumerReactiveHttpClient resolveClient();
 
@@ -45,7 +42,7 @@ abstract class DmaapConsumerTask {
 
     protected abstract DmaapConsumerConfiguration resolveConfiguration();
 
-    protected abstract Mono<List<ConsumerDmaapModel>> execute(String object) throws DatafileTaskException;
+    protected abstract Flux<FileData> execute(String object);
 
     WebClient buildWebClient() {
         return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
index 839e03c..7ec474c 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import java.util.List;
-
 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -45,42 +42,32 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     private Config datafileAppConfig;
     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
     private DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
-    FileCollector fileCollector;
 
     @Autowired
-    public DmaapConsumerTaskImpl(AppConfig datafileAppConfig, FileCollector fileCollector) {
+    public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
         this.datafileAppConfig = datafileAppConfig;
         this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
-        this.fileCollector = fileCollector;
     }
 
     protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
-        DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
-        DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) {
+            DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+            DmaapConsumerJsonParser dmaapConsumerJsonParser) {
         this.datafileAppConfig = datafileAppConfig;
         this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
         this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
-        this.fileCollector = fileCollector;
     }
 
     @Override
-    Mono<List<FileData>> consume(Mono<String> message) {
-        logger.trace("Method called with arg {}", message);
+    Flux<FileData> consume(Mono<String> message) {
+        logger.trace("consume called with arg {}", message.toString());
         return dmaapConsumerJsonParser.getJsonObject(message);
     }
 
-    private Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) {
-        Mono<List<ConsumerDmaapModel>> filesFromSender = fileCollector.getFilesFromSender(listOfFileData);
-        return filesFromSender;
-    }
-
     @Override
-    protected Mono<List<ConsumerDmaapModel>> execute(String object) {
+    protected Flux<FileData> execute(String object) {
         dmaaPConsumerReactiveHttpClient = resolveClient();
-        logger.trace("Method called with arg {}", object);
-        Mono<List<FileData>> consumerResult =
-            consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
-        return consumerResult.flatMap(this::getFilesFromSender);
+        logger.trace("execute called with arg {}", object);
+        return consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
     }
 
     @Override
index 716b52c..0b81df5 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import java.util.List;
-
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
 import org.springframework.web.reactive.function.client.WebClient;
 
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -33,14 +30,13 @@ import reactor.core.publisher.Mono;
  */
 abstract class DmaapPublisherTask {
 
-    abstract Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModel) throws DatafileTaskException;
+    abstract Flux<String> publish(ConsumerDmaapModel consumerDmaapModel);
 
     abstract DmaapProducerReactiveHttpClient resolveClient();
 
     protected abstract DmaapPublisherConfiguration resolveConfiguration();
 
-    protected abstract Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModel)
-            throws DatafileTaskException;
+    protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
 
     WebClient buildWebClient() {
         return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
index 5779051..b4ee3a9 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import java.util.List;
-
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
 import org.slf4j.Logger;
@@ -30,7 +26,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -49,20 +45,16 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
     }
 
     @Override
-    public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) {
-        logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels);
-        return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels);
+    public Flux<String> publish(ConsumerDmaapModel consumerDmaapModel) {
+        logger.trace("Publishing on DMaaP DataRouter {}", consumerDmaapModel);
+        return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
     }
 
     @Override
-    public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels)
-        throws DatafileTaskException {
-        if (consumerDmaapModels == null) {
-            throw new DmaapNotFoundException("Invoked null object to DMaaP task");
-        }
+    public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
         dmaapProducerReactiveHttpClient = resolveClient();
-        logger.trace("Method called with arg {}", consumerDmaapModels);
-        return publish(consumerDmaapModels);
+        logger.trace("Method called with arg {}", consumerDmaapModel);
+        return publish(consumerDmaapModel);
     }
 
     @Override
index 14085bb..c263c95 100644 (file)
@@ -2,35 +2,29 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import java.util.List;
-import java.util.concurrent.Callable;
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import reactor.core.publisher.Flux;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -42,17 +36,21 @@ public class ScheduledTasks {
     private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
 
     private final DmaapConsumerTask dmaapConsumerTask;
+    private final XnfCollectorTask xnfCollectorTask;
     private final DmaapPublisherTask dmaapProducerTask;
 
     /**
      * Constructor for task registration in Datafile Workflow.
      *
      * @param dmaapConsumerTask - fist task
-     * @param dmaapPublisherTask - second task
+     * @param xnfCollectorTask - second task
+     * @param dmaapPublisherTask - third task
      */
     @Autowired
-    public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask) {
+    public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
+            DmaapPublisherTask dmaapPublisherTask) {
         this.dmaapConsumerTask = dmaapConsumerTask;
+        this.xnfCollectorTask = xnfCollectorTask;
         this.dmaapProducerTask = dmaapPublisherTask;
     }
 
@@ -62,12 +60,10 @@ public class ScheduledTasks {
     public void scheduleMainDatafileEventTask() {
         logger.trace("Execution of tasks was registered");
 
-        Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDmaapMessage())
-            .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP"))
-            .flatMap(this::publishToDmaapConfiguration)
-            .subscribeOn(Schedulers.elastic());
-
-        dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+        consumeFromDmaapMessage()
+                .doOnError(DmaapEmptyResponseException.class, error -> logger.error("Nothing to consume from DMaaP"))
+                .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
+                .subscribe(this::onSuccess, this::onError, this::onComplete);
     }
 
     private void onComplete() {
@@ -84,18 +80,16 @@ public class ScheduledTasks {
         }
     }
 
-    private Callable<Mono<List<ConsumerDmaapModel>>> consumeFromDmaapMessage() {
-        return () -> {
-            dmaapConsumerTask.initConfigs();
-            return dmaapConsumerTask.execute("");
-        };
+    private Flux<FileData> consumeFromDmaapMessage() {
+        dmaapConsumerTask.initConfigs();
+        return dmaapConsumerTask.execute("");
     }
 
-    private Mono<String> publishToDmaapConfiguration(Mono<List<ConsumerDmaapModel>> monoModel) {
-        try {
-            return dmaapProducerTask.execute(monoModel);
-        } catch (DatafileTaskException e) {
-            return Mono.error(e);
-        }
+    private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
+        return xnfCollectorTask.execute(fileData);
+    }
+
+    private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
+        return dmaapProducerTask.execute(monoModel);
     }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
new file mode 100644 (file)
index 0000000..66d59ae
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface XnfCollectorTask {
+    Flux<ConsumerDmaapModel> execute(FileData fileData);
+}
@@ -1,73 +1,71 @@
-/*-
+/*
  * ============LICENSE_START======================================================================
- *  Copyright (C) 2018 Ericsson. All rights reserved.
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.ftp;
+package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import java.io.File;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.commons.io.FilenameUtils;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
 
 /**
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
  */
 @Component
-public class FileCollector { // TODO: Should be final, but that means adding PowerMock or Mockito
-                             // 2.x for testing so it is left for later improvement.
+public class XnfCollectorTaskImpl implements XnfCollectorTask {
+
     private static final String FTPES = "ftpes";
     private static final String FTPS = "ftps";
     private static final String SFTP = "sftp";
 
-    private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+    private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
 
     private final FtpsClient ftpsClient;
     private final SftpClient sftpClient;
 
     @Autowired
-    protected FileCollector(FtpsClient ftpsCleint, SftpClient sftpClient) {
+    protected XnfCollectorTaskImpl(FtpsClient ftpsCleint, SftpClient sftpClient) {
         this.ftpsClient = ftpsCleint;
         this.sftpClient = sftpClient;
     }
 
-    public Mono<List<ConsumerDmaapModel>> getFilesFromSender(List<FileData> listOfFileData) {
-        List<ConsumerDmaapModel> consumerModels = new ArrayList<ConsumerDmaapModel>();
-        for (FileData fileData : listOfFileData) {
-            String localFile = collectFile(fileData);
+    @Override
+    public Flux<ConsumerDmaapModel> execute(FileData fileData) {
+        logger.trace("Entering execute with {}", fileData);
+        String localFile = collectFile(fileData);
 
-            if (localFile != null) {
-                ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
-                consumerModels.add(consumerDmaapModel);
-            }
+        if (localFile != null) {
+            ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
+            logger.trace("Exiting execute with {}", consumerDmaapModel);
+            return Flux.just(consumerDmaapModel);
         }
-        return Mono.just(consumerModels);
+        logger.trace("Exiting execute with empty");
+        return Flux.empty();
     }
 
     private String collectFile(FileData fileData) {
@@ -78,7 +76,7 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
                 .userId(userInfo != null ? userInfo[0] : "").password(userInfo != null ? userInfo[1] : "")
                 .port(uri.getPort()).build();
         String remoteFile = uri.getPath();
-        String localFile = "target/" + FilenameUtils.getName(remoteFile);
+        String localFile = "target" + File.separator + fileData.name();
         String scheme = uri.getScheme();
 
         boolean fileDownloaded = false;
@@ -88,8 +86,8 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
             fileDownloaded = sftpClient.collectFile(fileServerData, remoteFile, localFile);
         } else {
 
-            logger.error("DFC does not support protocol {}. Supported protocols are " + FTPES + ", " + FTPS + ", and "
-                    + SFTP + ". " + fileData);
+            logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
+                    FTPES, FTPS, SFTP, fileData);
             localFile = null;
         }
         if (!fileDownloaded) {
@@ -107,11 +105,12 @@ public class FileCollector { // TODO: Should be final, but that means adding Pow
     }
 
     private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
+        String name = fileData.name();
         String compression = fileData.compression();
         String fileFormatType = fileData.fileFormatType();
         String fileFormatVersion = fileData.fileFormatVersion();
 
-        return ImmutableConsumerDmaapModel.builder().location(localFile).compression(compression)
+        return ImmutableConsumerDmaapModel.builder().name(name).location(localFile).compression(compression)
                 .fileFormatType(fileFormatType).fileFormatVersion(fileFormatVersion).build();
     }
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
deleted file mode 100644 (file)
index 2f61ac9..0000000
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public class FileCollectorTest {
-
-    private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
-    private static final String FILE_READY_CHANGE_TYPE = "FileReady";
-    private static final String FTPES_SCHEME = "ftpes://";
-    private static final String SFTP_SCHEME = "sftp://";
-    private static final String SERVER_ADDRESS = "192.168.0.101";
-    private static final int PORT_22 = 22;
-    private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
-    private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
-    private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
-    private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
-    private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
-    private static final String GZIP_COMPRESSION = "gzip";
-    private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
-    private static final String FILE_FORMAT_VERSION = "V10";
-
-    private FtpsClient ftpsClientMock = mock(FtpsClient.class);
-
-    private SftpClient sftpClientMock = mock(SftpClient.class);
-
-    private FileCollector fileCollectorUndetTest = new FileCollector(ftpsClientMock, sftpClientMock);
-
-    @Test
-    public void whenSingleFtpesFile_returnCorrectResponse() {
-        List<FileData> listOfFileData = new ArrayList<FileData>();
-        listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
-            .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
-            .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
-
-        FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
-            .userId("").password("").build();
-        when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
-
-        Mono<List<ConsumerDmaapModel>> consumerModelsMono =
-            fileCollectorUndetTest.getFilesFromSender(listOfFileData);
-
-        List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
-        assertEquals(1, consumerModels.size());
-        ConsumerDmaapModel consumerDmaapModel = consumerModels.get(0);
-        assertEquals(GZIP_COMPRESSION, consumerDmaapModel.getCompression());
-        assertEquals(MEAS_COLLECT_FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType());
-        assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
-        assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
-        FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
-            .userId("").password("").port(PORT_22).build();
-        verify(ftpsClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
-        verifyNoMoreInteractions(ftpsClientMock);
-    }
-
-    @Test
-    public void whenSingleSftpFile_returnCorrectResponse() {
-        List<FileData> listOfFileData = new ArrayList<FileData>();
-        listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
-            .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
-            .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
-
-        FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
-            .userId("").password("").build();
-        when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
-
-        Mono<List<ConsumerDmaapModel>> consumerModelsMono =
-            fileCollectorUndetTest.getFilesFromSender(listOfFileData);
-
-        List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
-        assertEquals(1, consumerModels.size());
-        ConsumerDmaapModel consumerDmaapModel = consumerModels.get(0);
-        assertEquals(GZIP_COMPRESSION, consumerDmaapModel.getCompression());
-        assertEquals(MEAS_COLLECT_FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType());
-        assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
-        assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
-        FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
-            .userId("").password("").port(PORT_22).build();
-        verify(sftpClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
-        verifyNoMoreInteractions(ftpsClientMock);
-    }
-}
index 8c36a51..b5457b8 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.service;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.spy;
 
-import java.util.List;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
 import java.util.Optional;
 
 import org.junit.jupiter.api.Test;
@@ -31,9 +31,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
 
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -42,19 +39,27 @@ import reactor.test.StepVerifier;
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
 class DmaapConsumerJsonParserTest {
+    private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+    private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+    private static final String GZIP_COMPRESSION = "gzip";
+    private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+    private static final String FILE_FORMAT_VERSION = "V10";
+    private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+    private static final String CHANGE_TYPE = "FileReady";
+    private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
 
     @Test
     void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
-
-                .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
-                .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
-                .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+                .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField).build();
 
-        FileData expectedFileData = ImmutableFileData.builder().changeIdentifier("PM_MEAS_FILES")
-                .changeType("FileReady").location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz")
-                .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
+        FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -63,19 +68,18 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
-        assertNotNull(fileDataResult);
-        assertEquals(expectedFileData, fileDataResult.get(0));
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNext(expectedFileData).verifyComplete();
     }
 
     @Test
     void whenPassingCorrectJsonWihoutName_noFileData() {
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
-                .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
-                .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+        AdditionalField additionalField =
+                new JsonMessage.AdditionalFieldBuilder().location(LOCATION).compression(GZIP_COMPRESSION)
+                        .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField).build();
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -84,18 +88,18 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
-        assertNotNull(fileDataResult);
-        assertEquals(0, fileDataResult.size());
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNextCount(0).verifyComplete();
     }
 
     @Test
     void whenPassingCorrectJsonWihoutLocation_noFileData() {
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
-                .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
-                .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+        AdditionalField additionalField =
+                new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).compression(GZIP_COMPRESSION)
+                        .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField).build();
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -104,19 +108,17 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
-        assertNotNull(fileDataResult);
-        assertEquals(0, fileDataResult.size());
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNextCount(0).verifyComplete();
     }
 
     @Test
     void whenPassingCorrectJsonWihoutCompression_noFileData() {
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
-                .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz")
-                .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
-                .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+                .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField).build();
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -125,19 +127,17 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
-        assertNotNull(fileDataResult);
-        assertEquals(0, fileDataResult.size());
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNextCount(0).verifyComplete();
     }
 
     @Test
     void whenPassingCorrectJsonWihoutFileFormatType_noFileData() {
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
-                .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
-                .fileFormatVersion("V10").build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
-                .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalField).build();
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+                .compression(GZIP_COMPRESSION).fileFormatVersion(FILE_FORMAT_VERSION).build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField).build();
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -146,24 +146,24 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
-        assertNotNull(fileDataResult);
-        assertEquals(0, fileDataResult.size());
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNextCount(0).verifyComplete();
     }
 
     @Test
     void whenPassingOneCorrectJsonWihoutFileFormatVersionAndOneCorrect_oneFileData() {
-        AdditionalField additionalFaultyField =
-                new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
-                        .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
-                        .fileFormatType("org.3GPP.32.435#measCollec").build();
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
-                .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
-                .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
-                .changeType("FileReady").notificationFieldsVersion("1.0").addAdditionalField(additionalFaultyField)
-                .addAdditionalField(additionalField).build();
+        AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME)
+                .location(LOCATION).compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).build();
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name(PM_FILE_NAME).location(LOCATION)
+                .compression(GZIP_COMPRESSION).fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE).notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalFaultyField).addAdditionalField(additionalField).build();
+
+        FileData expectedFileData = ImmutableFileData.builder().changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE).name(PM_FILE_NAME).location(LOCATION).compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
 
         String messageString = message.toString();
         String parsedString = message.getParsed();
@@ -172,10 +172,8 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        List<FileData> fileDataResult = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageString))).block();
-
-        assertNotNull(fileDataResult);
-        assertEquals(1, fileDataResult.size());
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNext(expectedFileData).verifyComplete();
     }
 
     @Test
index e681845..43502b4 100644 (file)
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
@@ -35,16 +34,16 @@ import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfig
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -53,7 +52,7 @@ import reactor.test.StepVerifier;
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
 class DmaapConsumerTaskImplTest {
-    private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
+    private static final String PM_MEAS_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
     private static final String FILE_READY_CHANGE_TYPE = "FileReady";
     private static final String FTPES_SCHEME = "ftpes://";
     private static final String SFTP_SCHEME = "sftp://";
@@ -75,13 +74,11 @@ class DmaapConsumerTaskImplTest {
     private DmaapConsumerTaskImpl dmaapConsumerTask;
     private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
 
-    private static FileCollector fileCollectorMock;
-
     private static String ftpesMessage;
-    private static List<FileData> ftpesFileDataAfterConsume = new ArrayList<FileData>();
+    private static FileData ftpesFileData;
 
     private static String sftpMessage;
-    private static List<FileData> sftpFileDataAfterConsume = new ArrayList<FileData>();
+    private static FileData sftpFileData;
 
     @BeforeAll
     public static void setUp() {
@@ -95,42 +92,38 @@ class DmaapConsumerTaskImplTest {
         AdditionalField ftpesAdditionalField =
                 new JsonMessage.AdditionalFieldBuilder().location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
                         .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
-        JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+        JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
                 .changeType(FILE_READY_CHANGE_TYPE).notificationFieldsVersion("1.0")
                 .addAdditionalField(ftpesAdditionalField).build();
         ftpesMessage = ftpesJsonMessage.toString();
-        FileData ftpesFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
-                .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
-        ftpesFileDataAfterConsume.add(ftpesFileData);
+        ftpesFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
+                .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
+                .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION).build();
 
         AdditionalField sftpAdditionalField =
                 new JsonMessage.AdditionalFieldBuilder().location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
                         .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
-        JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+        JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
                 .changeType(FILE_READY_CHANGE_TYPE).notificationFieldsVersion("1.0")
                 .addAdditionalField(sftpAdditionalField).build();
         sftpMessage = sftpJsonMessage.toString();
-        FileData sftpFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
-                .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
-        sftpFileDataAfterConsume.add(sftpFileData);
+        sftpFileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
+                .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION)
+                .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION).build();
 
 
-        ImmutableConsumerDmaapModel consumerDmaapModel =
-                ImmutableConsumerDmaapModel.builder().location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
-                        .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+        ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+                .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
         listOfConsumerDmaapModel.add(consumerDmaapModel);
-
-        fileCollectorMock = mock(FileCollector.class);
     }
 
     @Test
     public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
-        // given
-        prepareMocksForDmaapConsumer("", new ArrayList<FileData>());
+        prepareMocksForDmaapConsumer("", null);
 
-        // then
         StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
                 .expectError(DmaapEmptyResponseException.class).verify();
 
@@ -139,51 +132,39 @@ class DmaapConsumerTaskImplTest {
 
     @Test
     public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
-        // given
-        prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileDataAfterConsume);
-        // when
-        final List<ConsumerDmaapModel> arrayOfResponse = dmaapConsumerTask.execute("Sample input").block();
-        // then
+        prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
+
+        StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
+
         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
-        verify(fileCollectorMock, times(1)).getFilesFromSender(ftpesFileDataAfterConsume);
-        verifyNoMoreInteractions(fileCollectorMock);
-        Assertions.assertEquals(listOfConsumerDmaapModel, arrayOfResponse);
-
     }
 
     @Test
     public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
-        // given
-        prepareMocksForDmaapConsumer(sftpMessage, sftpFileDataAfterConsume);
-        // when
-        final List<ConsumerDmaapModel> arrayOfResponse = dmaapConsumerTask.execute("Sample input").block();
-        // then
+        prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
+
+        StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
+
         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaapConsumerResponse();
         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
-        verify(fileCollectorMock, times(1)).getFilesFromSender(sftpFileDataAfterConsume);
-        verifyNoMoreInteractions(fileCollectorMock);
-        Assertions.assertEquals(listOfConsumerDmaapModel, arrayOfResponse);
-
     }
 
-    private void prepareMocksForDmaapConsumer(String message, List<FileData> fileDataAfterConsume) {
+    private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
         Mono<String> messageAsMono = Mono.just(message);
         DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
         dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
         when(dmaapConsumerReactiveHttpClient.getDmaapConsumerResponse()).thenReturn(messageAsMono);
 
         if (!message.isEmpty()) {
-            when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Mono.just(fileDataAfterConsume));
+            when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
         } else {
             when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
-                    .thenReturn(Mono.error(new DmaapEmptyResponseException()));
+                    .thenReturn(Flux.error(new DmaapEmptyResponseException()));
         }
-        when(fileCollectorMock.getFilesFromSender(fileDataAfterConsume))
-                .thenReturn(Mono.just(listOfConsumerDmaapModel));
 
-        dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient,
-                dmaapConsumerJsonParserMock, fileCollectorMock));
+        dmaapConsumerTask =
+                spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
         when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
         doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
     }
index 4f7787e..c124e98 100644 (file)
@@ -2,17 +2,15 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
@@ -27,32 +25,27 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.function.Executable;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
 import org.springframework.http.HttpStatus;
 
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
 class DmaapPublisherTaskImplTest {
+    private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
 
     private static ConsumerDmaapModel consumerDmaapModel;
-    private static List<ConsumerDmaapModel> listOfConsumerDmaapModel;
     private static DmaapPublisherTaskImpl dmaapPublisherTask;
     private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
     private static AppConfig appConfig;
@@ -64,48 +57,17 @@ class DmaapPublisherTaskImplTest {
                 new ImmutableDmaapPublisherConfiguration.Builder().dmaapContentType("application/json")
                         .dmaapHostName("54.45.33.2").dmaapPortNumber(1234).dmaapProtocol("https").dmaapUserName("DFC")
                         .dmaapUserPassword("DFC").dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT").build();
-        consumerDmaapModel = ImmutableConsumerDmaapModel.builder().location("target/A20161224.1030-1045.bin.gz")
+        consumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME).location("target/" + PM_FILE_NAME)
                 .compression("gzip").fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
-        listOfConsumerDmaapModel = new ArrayList<ConsumerDmaapModel>();
-        listOfConsumerDmaapModel.add(consumerDmaapModel);
         appConfig = mock(AppConfig.class);
     }
 
     @Test
-    public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
-        // given
-        when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
-        dmaapPublisherTask = new DmaapPublisherTaskImpl(appConfig);
-
-        // when
-        Executable executableFunction = () -> dmaapPublisherTask.execute(null);
-
-        // then
-        Assertions.assertThrows(DatafileTaskException.class, executableFunction,
-                "The specified parameter is incorrect");
-    }
-
-    @Test
-    public void whenPassedObjectFits_ReturnsCorrectStatus() throws DatafileTaskException {
-        // given
+    public void whenPassedObjectFits_ReturnsCorrectStatus() {
         prepareMocksForTests(HttpStatus.OK.value());
 
-        // when
-        dmaapPublisherTask.execute(Mono.just(listOfConsumerDmaapModel));
-
-        // then
-        verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
-        verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
-    }
-
-    @Test
-    public void whenPassedObjectFits_ReturnsNoContent() throws DatafileTaskException {
-        // given
-        prepareMocksForTests(HttpStatus.NO_CONTENT.value());
-
-        dmaapPublisherTask.execute(Mono.just(listOfConsumerDmaapModel));
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete();
 
-        // then
         verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
         verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
@@ -113,7 +75,7 @@ class DmaapPublisherTaskImplTest {
     private void prepareMocksForTests(Integer httpResponseCode) {
         dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
         when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any()))
-                .thenReturn(Mono.just(httpResponseCode.toString()));
+                .thenReturn(Flux.just(httpResponseCode.toString()));
         when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
         dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
         when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
new file mode 100644 (file)
index 0000000..528a481
--- /dev/null
@@ -0,0 +1,131 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class XnfCollectorTaskImplTest {
+
+    private static final String PM_MEAS_CHANGE_IDINTIFIER = "PM_MEAS_FILES";
+    private static final String FILE_READY_CHANGE_TYPE = "FileReady";
+    private static final String FTPES_SCHEME = "ftpes://";
+    private static final String SFTP_SCHEME = "sftp://";
+    private static final String SERVER_ADDRESS = "192.168.0.101";
+    private static final int PORT_22 = 22;
+    private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+    private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
+    private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME;
+    private static final String USER = "usr";
+    private static final String PWD = "pwd";
+    private static final String FTPES_LOCATION =
+            FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+    private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+    private static final String GZIP_COMPRESSION = "gzip";
+    private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+    private static final String FILE_FORMAT_VERSION = "V10";
+
+    private FtpsClient ftpsClientMock = mock(FtpsClient.class);
+
+    private SftpClient sftpClientMock = mock(SftpClient.class);
+
+    private XnfCollectorTask collectorUndetTest = new XnfCollectorTaskImpl(ftpsClientMock, sftpClientMock);
+
+    @Test
+    public void whenSingleFtpesFile_returnCorrectResponse() {
+        FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+                .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(FTPES_LOCATION)
+                .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+        FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId(USER)
+                .password(PWD).port(PORT_22).build();
+        when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+                .thenReturn(Boolean.TRUE);
+
+        ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+                .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+        StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
+                .verifyComplete();
+
+        verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+        verifyNoMoreInteractions(ftpsClientMock);
+    }
+
+    @Test
+    public void whenSingleSftpFile_returnCorrectResponse() {
+        FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+                .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location(SFTP_LOCATION)
+                .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+        FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("")
+                .password("").port(PORT_22).build();
+        when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+                .thenReturn(Boolean.TRUE);
+
+        ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder().name(PM_FILE_NAME)
+                .location(LOCAL_FILE_LOCATION).compression(GZIP_COMPRESSION)
+                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+        StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
+                .verifyComplete();
+
+        verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+        verifyNoMoreInteractions(ftpsClientMock);
+    }
+
+    @Test
+    public void whenWrongScheme_returnEmpty() {
+        FileData fileData = ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
+                .changeType(FILE_READY_CHANGE_TYPE).name(PM_FILE_NAME).location("http://host.com/file.zip")
+                .compression(GZIP_COMPRESSION).fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION).build();
+
+        FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).userId("")
+                .password("").port(PORT_22).build();
+        when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
+                .thenReturn(Boolean.TRUE);
+
+        StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
+
+        verifyNoMoreInteractions(ftpsClientMock);
+    }
+}
index 94e7ccd..d9c146f 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.model;
 
+import com.google.gson.annotations.SerializedName;
+
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 
-import com.google.gson.annotations.SerializedName;
-
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -30,6 +30,9 @@ import com.google.gson.annotations.SerializedName;
 @Gson.TypeAdapters
 public interface ConsumerDmaapModel {
 
+    @SerializedName("name")
+    String getName();
+
     @SerializedName("location")
     String getLocation();
 
index 103a70e..62e6b1b 100644 (file)
 package org.onap.dcaegen2.collectors.datafile.model;
 
 public class ConsumerDmaapModelForUnitTest implements ConsumerDmaapModel {
+    private final String name;
     private final String location;
     private final String compression;
     private final String fileFormatType;
     private final String fileFormatVersion;
 
     public ConsumerDmaapModelForUnitTest() {
+        this.name = "A20161224.1030-1045.bin.gz";
         this.location = "target/A20161224.1030-1045.bin.gz";
         this.compression = "gzip";
         this.fileFormatType = "org.3GPP.32.435#measCollec";
         this.fileFormatVersion = "V10";
     }
 
+    @Override
+    public String getName() {
+        return name;
+    }
+
     @Override
     public String getLocation() {
         return location;
index 062724e..f27e3b6 100644 (file)
@@ -21,11 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import org.junit.jupiter.api.Test;
 
 class CommonFunctionsTest {
-    // Given
     private ConsumerDmaapModel model = new ConsumerDmaapModelForUnitTest();
     private static final String EXPECTED_RESULT =
-        "{\"location\":\"target/A20161224.1030-1045.bin.gz\",\"compression\":\"gzip\","
-            + "\"fileFormatType\":\"org.3GPP.32.435#measCollec\",\"fileFormatVersion\":\"V10\"}";
+            "{\"name\":\"A20161224.1030-1045.bin.gz\",\"location\":\"target/A20161224.1030-1045.bin.gz\","
+            + "\"compression\":\"gzip\",\"fileFormatType\":\"org.3GPP.32.435#measCollec\","
+            + "\"fileFormatVersion\":\"V10\"}";
 
     @Test
     void createJsonBody_shouldReturnJsonInString() {
index 5b02897..e80670d 100644 (file)
@@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
 
 public class ConsumerDmaapModelTest {
 
+    private static final String NAME = "A20161224.1030-1045.bin.gz";
     private static final String LOCATION = "target/A20161224.1030-1045.bin.gz";
     private static final String COMPRESSION = "gzip";
     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
@@ -29,13 +30,12 @@ public class ConsumerDmaapModelTest {
     @Test
     public void consumerDmaapModelBuilder_shouldBuildAnObject() {
 
-        // When
-        // Given
-        ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().location(LOCATION).compression(COMPRESSION)
-            .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
+        ConsumerDmaapModel consumerDmaapModel =
+                ImmutableConsumerDmaapModel.builder().name(NAME).location(LOCATION).compression(COMPRESSION)
+                        .fileFormatType(FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build();
 
-        // Then
         Assertions.assertNotNull(consumerDmaapModel);
+        Assertions.assertEquals(NAME, consumerDmaapModel.getName());
         Assertions.assertEquals(LOCATION, consumerDmaapModel.getLocation());
         Assertions.assertEquals(COMPRESSION, consumerDmaapModel.getCompression());
         Assertions.assertEquals(FILE_FORMAT_TYPE, consumerDmaapModel.getFileFormatType());
index 4b8ce08..36050ff 100644 (file)
@@ -21,7 +21,6 @@ import com.google.gson.JsonParser;
 
 import java.io.File;
 import java.net.URI;
-import java.util.List;
 
 import org.apache.http.HttpHeaders;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
@@ -38,6 +37,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri
 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -47,7 +47,8 @@ import reactor.core.publisher.Mono;
 public class DmaapProducerReactiveHttpClient {
 
     private static final String X_ATT_DR_META = "X-ATT-DR-META";
-    private static final String LOCATION = "location";
+    private static final String NAME_JSON_TAG = "name";
+    private static final String LOCATION_JSON_TAG = "location";
     private static final String DEFAULT_FEED_ID = "1";
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -76,54 +77,41 @@ public class DmaapProducerReactiveHttpClient {
     /**
      * Function for calling DMaaP HTTP producer - post request to DMaaP.
      *
-     * @param consumerDmaapModelMono - object which will be sent to DMaaP
+     * @param consumerDmaapModel - object which will be sent to DMaaP
      * @return status code of operation
      */
-    public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
-        consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
-        return Mono.just(HttpStatus.OK.toString());
-    }
+    public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+        logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
 
-    public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
-        this.webClient = webClient;
-        return this;
-    }
+        RequestBodyUriSpec post = webClient.post();
 
-    private void postFilesAndData(List<ConsumerDmaapModel> models) {
-        for (ConsumerDmaapModel consumerDmaapModel : models) {
-            postFileAndData(consumerDmaapModel);
-        }
-    }
+        prepareHead(consumerDmaapModel, post);
 
-    private void postFileAndData(ConsumerDmaapModel model) {
-        RequestBodyUriSpec post = webClient.post();
+        prepareBody(consumerDmaapModel, post);
 
-        boolean headPrepared = prepareHead(model, post);
+        ResponseSpec responseSpec = post.retrieve();
+        responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
+        responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
+        Flux<String> response = responseSpec.bodyToFlux(String.class);
 
-        if (headPrepared) {
-            prepareBody(model, post);
+        logger.trace("Exiting getDmaapProducerResponse with {}", response);
+        return response;
+    }
 
-            ResponseSpec responseSpec = post.retrieve();
-            responseSpec.onStatus(HttpStatus::is4xxClientError,
-                    clientResponse -> handlePostErrors(model, clientResponse));
-            responseSpec.onStatus(HttpStatus::is5xxServerError,
-                    clientResponse -> handlePostErrors(model, clientResponse));
-            String bodyToMono = responseSpec.bodyToMono(String.class).block();
-            logger.debug("File info sent to DR with response: " + bodyToMono);
-        }
+    public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
+        this.webClient = webClient;
+        return this;
     }
 
-    private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
-        boolean result = true;
+    private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
         post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
 
         JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
-        String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
+        String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+        metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
         post.header(X_ATT_DR_META, metaData.toString());
 
-        post.uri(getUri(location));
-
-        return result;
+        post.uri(getUri(name));
     }
 
     private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
@@ -133,12 +121,10 @@ public class DmaapProducerReactiveHttpClient {
         post.body(BodyInserters.fromResource(httpResource));
     }
 
-    private URI getUri(String location) {
-        String fileName = location.substring(location.indexOf("/"), location.length());
-        String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" +  fileName;
-        URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
-                .port(dmaapPortNumber).path(path).build();
-        return uri;
+    private URI getUri(String fileName) {
+        String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
+        return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
+                .path(path).build();
     }
 
     private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
index c0dbf31..5f4c1a5 100644 (file)
@@ -42,7 +42,8 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodyUri
 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -52,6 +53,7 @@ class DmaapProducerReactiveHttpClientTest {
 
     private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
     private static final String LOCATION_JSON_TAG = "location";
+    private static final String NAME_JSON_TAG = "name";
     private static final String X_ATT_DR_META = "X-ATT-DR-META";
 
     private static final String HOST = "54.45.33.2";
@@ -98,11 +100,13 @@ class DmaapProducerReactiveHttpClientTest {
         List<ConsumerDmaapModel> consumerDmaapModelList = new ArrayList<ConsumerDmaapModel>();
         consumerDmaapModelList.add(consumerDmaapModel);
 
-        dmaapProducerReactiveHttpClient.getDmaapProducerResponse(Mono.just(consumerDmaapModelList));
+        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+                .expectNext("200").verifyComplete();
 
         verify(requestBodyUriSpecMock).header(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
         JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
         metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
+        metaData.getAsJsonObject().remove(NAME_JSON_TAG);
         verify(requestBodyUriSpecMock).header(X_ATT_DR_META, metaData.toString());
         URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTP_SCHEME).host(HOST).port(PORT)
                 .path(PUBLISH_TOPIC + "/" + DEFAULT_FEED_ID + "/" + FILE_NAME).build();
@@ -116,7 +120,7 @@ class DmaapProducerReactiveHttpClientTest {
 
         when(requestBodyUriSpecMock.retrieve()).thenReturn(responseSpecMock);
         when(responseSpecMock.onStatus(any(), any())).thenReturn(responseSpecMock);
-        Mono<String> expectedResult = Mono.just("200");
-        when(responseSpecMock.bodyToMono(String.class)).thenReturn(expectedResult);
+        Flux<String> expectedResult = Flux.just("200");
+        when(responseSpecMock.bodyToFlux(String.class)).thenReturn(expectedResult);
     }
 }