Improved logging 07/85207/1
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 12 Apr 2019 11:57:47 +0000 (11:57 +0000)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 12 Apr 2019 11:57:47 +0000 (11:57 +0000)
File name is used in Requext_ID to make it
more easy to trouble shoot.

Change-Id: Ied98766f1a177a9bda35ec5892a60d06619ff3a7
Issue-ID: DCAEGEN2-1305
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
12 files changed:
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MappedDiagnosticContext.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java

index 27df49f..c1b4c0d 100644 (file)
@@ -24,6 +24,7 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonPrimitive;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
+
 import java.lang.reflect.Type;
 import java.nio.file.Path;
 
@@ -43,7 +44,7 @@ public class CommonFunctions {
      *
      * @param filePublishInformation info to serialize.
      *
-     * @return a string with the serialized model.
+     * @return a string with the serialized info.
      */
     public static String createJsonBody(FilePublishInformation filePublishInformation) {
         return gson.toJson(filePublishInformation);
index 4530242..066402b 100644 (file)
 package org.onap.dcaegen2.collectors.datafile.model;
 
 import com.google.gson.annotations.SerializedName;
+
 import java.nio.file.Path;
+import java.util.Map;
+
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@@ -68,4 +71,7 @@ public interface FilePublishInformation extends DmaapModel {
 
     @SerializedName("fileFormatVersion")
     String getFileFormatVersion();
+
+    @SerializedName("context")
+    Map<String, String> getContext();
 }
index 2643eea..86ea5c3 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.model.logging;
 
 import java.util.Map;
 import java.util.UUID;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
@@ -84,4 +85,17 @@ public final class MappedDiagnosticContext {
         MDC.put(MdcVariables.REQUEST_ID, UUID.randomUUID().toString());
         return MDC.getCopyOfContextMap();
     }
+
+    /**
+     * Updates the request ID in the current context.
+     * @param newRequestId the new value of the request ID
+     * @return a copy the updated context
+     */
+    public static Map<String, String> setRequestId(String newRequestId) {
+        Map<String, String> context = MDC.getCopyOfContextMap();
+        context.put(MdcVariables.REQUEST_ID, newRequestId);
+        MDC.setContextMap(context);
+        return context;
+    }
+
 }
index ad03170..05f04b3 100644 (file)
@@ -22,12 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.file.Path;
 import java.time.Duration;
-import java.util.Map;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPut;
@@ -45,6 +46,7 @@ import org.slf4j.MDC;
 import org.springframework.core.io.FileSystemResource;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
+
 import reactor.core.publisher.Mono;
 
 /**
@@ -72,56 +74,55 @@ public class DataRouterPublisher {
     /**
      * Publish one file.
      *
-     * @param model information about the file to publish
+     * @param publishInfo information about the file to publish
      * @param numRetries the maximal number of retries if the publishing fails
      * @param firstBackoff the time to delay the first retry
-     * @param contextMap tracing context variables
      * @return the (same) filePublishInformation
      */
-    public Mono<FilePublishInformation> publishFile(FilePublishInformation model, long numRetries,
-            Duration firstBackoff, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
-        logger.trace("publishFile called with arg {}", model);
+    public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
+            Duration firstBackoff) {
+        MDC.setContextMap(publishInfo.getContext());
+        logger.trace("publishFile called with arg {}", publishInfo);
         dmaapProducerReactiveHttpClient = resolveClient();
 
-        return Mono.just(model) //
+        return Mono.just(publishInfo) //
                 .cache() //
-                .flatMap(m -> publishFile(m, contextMap)) //
-                .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
+                .flatMap(this::publishFile) //
+                .flatMap(httpStatus -> handleHttpResponse(httpStatus, publishInfo)) //
                 .retryBackoff(numRetries, firstBackoff);
     }
 
-    private Mono<HttpStatus> publishFile(FilePublishInformation filePublishInformation,
-            Map<String, String> contextMap) {
-        logger.trace("Entering publishFile with {}", filePublishInformation);
+    private Mono<HttpStatus> publishFile(FilePublishInformation publishInfo
+            ) {
+        logger.trace("Entering publishFile with {}", publishInfo);
         try {
             HttpPut put = new HttpPut();
-            prepareHead(filePublishInformation, put);
-            prepareBody(filePublishInformation, put);
+            prepareHead(publishInfo, put);
+            prepareBody(publishInfo, put);
             dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
 
             HttpResponse response =
-                    dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+                    dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
             logger.trace("{}", response);
             return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
         } catch (Exception e) {
-            logger.warn("Unable to send file to DataRouter. Data: {}", filePublishInformation.getInternalLocation(), e);
+            logger.warn("Unable to send file to DataRouter. Data: {}", publishInfo.getInternalLocation(), e);
             return Mono.error(e);
         }
     }
 
-    private void prepareHead(FilePublishInformation model, HttpPut put) {
+    private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
         put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(publishInfo));
         metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
         put.addHeader(X_DMAAP_DR_META, metaData.toString());
-        put.setURI(getPublishUri(model.getName()));
+        put.setURI(getPublishUri(publishInfo.getName()));
         MappedDiagnosticContext.appendTraceInfo(put);
     }
 
-    private void prepareBody(FilePublishInformation model, HttpPut put) throws IOException {
-        Path fileLocation = model.getInternalLocation();
+    private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException {
+        Path fileLocation = publishInfo.getInternalLocation();
         try (InputStream fileInputStream = createInputStream(fileLocation)) {
             put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
         }
@@ -134,12 +135,11 @@ public class DataRouterPublisher {
                 .pathSegment(fileName).build();
     }
 
-    private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation model,
-            Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
+    private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+        MDC.setContextMap(publishInfo.getContext());
         if (HttpUtils.isSuccessfulResponseCode(response.value())) {
             logger.trace("Publish to DR successful!");
-            return Mono.just(model);
+            return Mono.just(publishInfo);
         } else {
             logger.warn("Publish to DR unsuccessful, response code: {}", response);
             return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response));
index cb93df1..6f3f6b7 100644 (file)
@@ -20,6 +20,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Map;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
+
 import reactor.core.publisher.Mono;
 
 /**
@@ -75,8 +77,8 @@ public class FileCollector {
                 .retryBackoff(numRetries, firstBackoff);
     }
 
-    private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
+    private Mono<FilePublishInformation> collectFile(FileData fileData, Map<String, String> context) {
+        MDC.setContextMap(context);
         logger.trace("starting to collectFile {}", fileData.name());
 
         final String remoteFile = fileData.remoteFilePath();
@@ -86,7 +88,7 @@ public class FileCollector {
             currentClient.open();
             localFile.getParent().toFile().mkdir(); // Create parent directories
             currentClient.collectFile(remoteFile, localFile);
-            return Mono.just(getFilePublishInformation(fileData, localFile));
+            return Mono.just(getFilePublishInformation(fileData, localFile, context));
         } catch (Exception throwable) {
             logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
                     throwable.toString());
@@ -105,7 +107,7 @@ public class FileCollector {
         }
     }
 
-    private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile) {
+    private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,Map<String, String> context) {
         String location = fileData.location();
         MessageMetaData metaData = fileData.messageMetaData();
         return ImmutableFilePublishInformation.builder() //
@@ -121,6 +123,7 @@ public class FileCollector {
                 .compression(fileData.compression()) //
                 .fileFormatType(fileData.fileFormatType()) //
                 .fileFormatVersion(fileData.fileFormatVersion()) //
+                .context(context) //
                 .build();
     }
 
index 364fa04..5cc894c 100644 (file)
@@ -40,6 +40,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
+
 /**
  * This implements the main flow of the data file collector. Fetch file ready events from the
  * message router, fetch new files from the PNF publish these in the data router.
@@ -89,7 +90,7 @@ public class ScheduledTasks {
             logger.trace("Execution of tasks was registered");
             applicationConfiguration.loadConfigurationFromFile();
             createMainTask(context) //
-                    .subscribe(model -> onSuccess(model, context), //
+                    .subscribe(this::onSuccess, //
                             throwable -> {
                                 onError(throwable, context);
                                 currentNumberOfSubscriptions.decrementAndGet();
@@ -103,18 +104,30 @@ public class ScheduledTasks {
         }
     }
 
-    Flux<FilePublishInformation> createMainTask(Map<String, String> contextMap) {
+    Flux<FilePublishInformation> createMainTask(Map<String, String> context) {
         return fetchMoreFileReadyMessages() //
-                .parallel(NUMBER_OF_WORKER_THREADS / 2) // Each message in parallel
+                .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
                 .runOn(scheduler) //
                 .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
                 .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
-                .filter(fileData -> shouldBePublished(fileData, contextMap)) //
-                .flatMap(fileData -> fetchFile(fileData, contextMap), false, 1, 1) //
-                .flatMap(model -> publishToDataRouter(model, contextMap), false, 1, 1) //
-                .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
-                .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
+                .flatMap(fileData -> createMdcContext(fileData, context)) //
+                .filter(this::shouldBePublished) //
+                .flatMap(this::fetchFile, false, 1, 1) //
+                .flatMap(this::publishToDataRouter,false, 1, 1) //
+                .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
+                .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
                 .sequential();
+
+    }
+
+    private class FileDataWithContext {
+        FileDataWithContext(FileData fileData, Map<String, String> context) {
+            this.fileData = fileData;
+            this.context = context;
+        }
+
+        final FileData fileData;
+        final Map<String, String> context;
     }
 
     /**
@@ -136,6 +149,10 @@ public class ScheduledTasks {
         return publishedFilesCache.size();
     }
 
+    public int getCurrentNumberOfSubscriptions() {
+        return currentNumberOfSubscriptions.get();
+    }
+
     protected DMaaPMessageConsumer createConsumerTask() {
         return new DMaaPMessageConsumer(this.applicationConfiguration);
     }
@@ -153,21 +170,28 @@ public class ScheduledTasks {
         logger.trace("Datafile tasks have been completed");
     }
 
-    private synchronized void onSuccess(FilePublishInformation model, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
-        logger.info("Datafile file published {}", model.getInternalLocation());
+    private synchronized void onSuccess(FilePublishInformation publishInfo) {
+        MDC.setContextMap(publishInfo.getContext());
+        logger.info("Datafile file published {}", publishInfo.getInternalLocation());
     }
 
-    private void onError(Throwable throwable, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
+    private void onError(Throwable throwable, Map<String, String> context) {
+        MDC.setContextMap(context);
         logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
     }
 
-    private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) {
+    private Mono<FileDataWithContext> createMdcContext(FileData fileData, Map<String, String> context) {
+        MDC.setContextMap(context);
+        context = MappedDiagnosticContext.setRequestId(fileData.name());
+        FileDataWithContext pair = new FileDataWithContext(fileData, context);
+        return Mono.just(pair);
+    }
+
+    private boolean shouldBePublished(FileDataWithContext fileData) {
         boolean result = false;
-        Path localFilePath = fileData.getLocalFilePath();
+        Path localFilePath = fileData.fileData.getLocalFilePath();
         if (publishedFilesCache.put(localFilePath) == null) {
-            result = !createPublishedChecker().isFilePublished(fileData.name(), contextMap);
+            result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
         }
         if (!result) {
             currentNumberOfTasks.decrementAndGet();
@@ -176,38 +200,36 @@ public class ScheduledTasks {
         return result;
     }
 
-    private Mono<FilePublishInformation> fetchFile(FileData fileData, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
-        return createFileCollector()
-                .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap)
-                .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap));
+    private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
+        MDC.setContextMap(fileData.context);
+        return createFileCollector().collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES,
+                FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, fileData.context)
+                .onErrorResume(exception -> handleFetchFileFailure(fileData));
     }
 
-    private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
-        Path localFilePath = fileData.getLocalFilePath();
-        logger.error("File fetching failed, fileData {}", fileData);
-        deleteFile(localFilePath, contextMap);
+    private Mono<FilePublishInformation> handleFetchFileFailure(FileDataWithContext fileData) {
+        MDC.setContextMap(fileData.context);
+        Path localFilePath = fileData.fileData.getLocalFilePath();
+        logger.error("File fetching failed, fileData {}", fileData.fileData);
+        deleteFile(localFilePath, fileData.context);
         publishedFilesCache.remove(localFilePath);
         currentNumberOfTasks.decrementAndGet();
         return Mono.empty();
     }
 
-    private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation model,
-            Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
+    private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation publishInfo) {
+        MDC.setContextMap(publishInfo.getContext());
 
         return createDataRouterPublisher()
-                .publishFile(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap)
-                .onErrorResume(exception -> handlePublishFailure(model, contextMap));
+                .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT)
+                .onErrorResume(exception -> handlePublishFailure(publishInfo));
     }
 
-    private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation model,
-            Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
-        logger.error("File publishing failed: {}", model);
-        Path internalFileName = model.getInternalLocation();
-        deleteFile(internalFileName, contextMap);
+    private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) {
+        MDC.setContextMap(publishInfo.getContext());
+        logger.error("File publishing failed: {}", publishInfo);
+        Path internalFileName = publishInfo.getInternalLocation();
+        deleteFile(internalFileName, publishInfo.getContext());
         publishedFilesCache.remove(internalFileName);
         currentNumberOfTasks.decrementAndGet();
         return Mono.empty();
@@ -221,21 +243,21 @@ public class ScheduledTasks {
                 "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
                 getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
 
-        Map<String, String> contextMap = MDC.getCopyOfContextMap();
+        Map<String, String> context = MDC.getCopyOfContextMap();
         return createConsumerTask() //
                 .getMessageRouterResponse() //
-                .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
+                .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
     }
 
-    private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
+    private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
+        MDC.setContextMap(context);
         logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(),
                 this.applicationConfiguration.getDmaapConsumerConfiguration());
         return Flux.empty();
     }
 
-    private void deleteFile(Path localFile, Map<String, String> contextMap) {
-        MDC.setContextMap(contextMap);
+    private void deleteFile(Path localFile, Map<String, String> context) {
+        MDC.setContextMap(context);
         logger.trace("Deleting file: {}", localFile);
         try {
             Files.delete(localFile);
index 5fdf30f..90237c9 100644 (file)
@@ -23,6 +23,8 @@ package org.onap.dcaegen2.collectors.datafile.model;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.nio.file.Paths;
+import java.util.HashMap;
+
 import org.junit.jupiter.api.Test;
 
 public class CommonFunctionsTest {
@@ -42,6 +44,7 @@ public class CommonFunctionsTest {
             .compression("") //
             .fileFormatType("") //
             .fileFormatVersion("") //
+            .context(new HashMap<String,String>())
             .build();
         String actualBody = CommonFunctions.createJsonBody(filePublishInformation);
 
index 950b9a6..83c92ef 100644 (file)
@@ -18,6 +18,8 @@ package org.onap.dcaegen2.collectors.datafile.model;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.HashMap;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -50,6 +52,7 @@ public class FilePublishInformationTest {
             .compression(COMPRESSION) //
             .fileFormatType(FILE_FORMAT_TYPE) //
             .fileFormatVersion(FILE_FORMAT_VERSION) //
+            .context(new HashMap<String,String>()) //
             .build();
 
         Assertions.assertNotNull(filePublishInformation);
index 06fa0b4..5e73725 100644 (file)
@@ -30,7 +30,9 @@ import static org.mockito.Mockito.when;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -47,6 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -170,6 +173,7 @@ public class DMaaPMessageConsumerTest {
                 .compression(GZIP_COMPRESSION) //
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
                 .fileFormatVersion(FILE_FORMAT_VERSION) //
+                .context(new HashMap<String,String>()) //
                 .build();
         listOfFilePublishInformation.add(filePublishInformation);
 
index 5746e0f..847d962 100644 (file)
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.StatusLine;
@@ -54,6 +55,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
 import org.springframework.http.HttpStatus;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 import org.springframework.web.util.UriBuilder;
+
 import reactor.test.StepVerifier;
 
 /**
@@ -89,7 +91,7 @@ class DataRouterPublisherTest {
     private static DmaapProducerHttpClient httpClientMock;
     private static AppConfig appConfig;
     private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
-    private final Map<String, String> contextMap = new HashMap<>();
+    private static Map<String, String> context = new HashMap<>();
     private static DataRouterPublisher publisherTaskUnderTestSpy;
 
     @BeforeAll
@@ -111,6 +113,7 @@ class DataRouterPublisherTest {
                 .compression("gzip") //
                 .fileFormatType(FILE_FORMAT_TYPE) //
                 .fileFormatVersion(FILE_FORMAT_VERSION) //
+                .context(context) //
                 .build(); //
         appConfig = mock(AppConfig.class);
         publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
@@ -119,9 +122,8 @@ class DataRouterPublisherTest {
     @Test
     public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
         prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
-        StepVerifier
-                .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
-                        contextMap))
+        StepVerifier //
+                .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
                 .expectNext(filePublishInformation) //
                 .verifyComplete();
 
@@ -146,7 +148,7 @@ class DataRouterPublisherTest {
 
         Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
         Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
-        assertTrue(10 == metaHash.size());
+        assertEquals(11, metaHash.size());
         assertEquals(PRODUCT_NAME, metaHash.get("productName"));
         assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
         assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
@@ -163,9 +165,8 @@ class DataRouterPublisherTest {
     void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
         prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
 
-        StepVerifier
-                .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0),
-                        contextMap))
+        StepVerifier //
+                .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0)))
                 .expectNext(filePublishInformation) //
                 .verifyComplete();
     }
@@ -175,9 +176,8 @@ class DataRouterPublisherTest {
         prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
                 Integer.valueOf(HttpStatus.OK.value()));
 
-        StepVerifier
-                .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
-                        contextMap))
+        StepVerifier //
+                .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
                 .expectNext(filePublishInformation) //
                 .verifyComplete();
 
@@ -192,9 +192,8 @@ class DataRouterPublisherTest {
         prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
                 Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
 
-        StepVerifier
-                .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0),
-                        contextMap))
+        StepVerifier //
+                .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
                 .expectErrorMessage("Retries exhausted: 1/1") //
                 .verify();
 
index 085f573..1a9d669 100644 (file)
@@ -31,6 +31,7 @@ import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -45,6 +46,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+
 import reactor.test.StepVerifier;
 
 public class FileCollectorTest {
@@ -130,6 +132,7 @@ public class FileCollectorTest {
                 .compression(GZIP_COMPRESSION) //
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
                 .fileFormatVersion(FILE_FORMAT_VERSION) //
+                .context(new HashMap<String,String>())
                 .build();
     }
 
index e07ed02..1a7db42 100644 (file)
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -52,6 +53,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -162,7 +164,7 @@ public class ScheduledTasksTest {
                 .compression("") //
                 .fileFormatType("") //
                 .fileFormatVersion("") //
-                .build();
+                .context(new HashMap<String, String>()).build();
     }
 
     @Test
@@ -190,9 +192,11 @@ public class ScheduledTasksTest {
 
         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
+        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
 
-        StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
+        StepVerifier //
+                .create(testedObject.createMainTask(contextMap)) //
+                .expectSubscription() //
                 .expectNextCount(noOfFiles) //
                 .expectComplete() //
                 .verify(); //
@@ -200,7 +204,7 @@ public class ScheduledTasksTest {
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).getMessageRouterResponse();
         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
-        verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull(), any());
+        verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
         verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);
         verifyNoMoreInteractions(consumerMock);
@@ -221,10 +225,12 @@ public class ScheduledTasksTest {
                 .when(fileCollectorMock) //
                 .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
 
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
+        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
+        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
 
-        StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
+        StepVerifier //
+                .create(testedObject.createMainTask(contextMap)) //
+                .expectSubscription() //
                 .expectNextCount(3) //
                 .expectComplete() //
                 .verify(); //
@@ -232,7 +238,7 @@ public class ScheduledTasksTest {
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).getMessageRouterResponse();
         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
-        verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull(), any());
+        verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
         verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);
         verifyNoMoreInteractions(consumerMock);
@@ -253,9 +259,11 @@ public class ScheduledTasksTest {
         // One publish will fail, the rest will succeed
         doReturn(collectedFile, error, collectedFile, collectedFile) //
                 .when(dataRouterMock) //
-                .publishFile(notNull(), anyLong(), notNull(), any());
+                .publishFile(notNull(), anyLong(), notNull());
 
-        StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
+        StepVerifier //
+                .create(testedObject.createMainTask(contextMap)) //
+                .expectSubscription() //
                 .expectNextCount(3) // 3 completed files
                 .expectComplete() //
                 .verify(); //
@@ -263,7 +271,7 @@ public class ScheduledTasksTest {
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).getMessageRouterResponse();
         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
-        verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull(), any());
+        verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
         verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);
         verifyNoMoreInteractions(consumerMock);
@@ -282,9 +290,10 @@ public class ScheduledTasksTest {
 
         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull(), any());
+        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
 
-        StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() //
+        StepVerifier //
+                .create(testedObject.createMainTask(contextMap)).expectSubscription() //
                 .expectNextCount(1) // 99 is skipped
                 .expectComplete() //
                 .verify(); //
@@ -292,7 +301,7 @@ public class ScheduledTasksTest {
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).getMessageRouterResponse();
         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
-        verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull(), any());
+        verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
         verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
         verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);