DFC logging according to ONAP specification 38/79438/10
authorRehanRaza <muhammad.rehan.raza@est.tech>
Fri, 8 Mar 2019 20:16:04 +0000 (20:16 +0000)
committerRehanRaza <muhammad.rehan.raza@est.tech>
Fri, 8 Mar 2019 20:16:04 +0000 (20:16 +0000)
Change-Id: I6fe18ce3bdbc6d0b1cf5c5e65534cab694cfb898
Issue-ID: DCAEGEN2-1305
Signed-off-by: RehanRaza <muhammad.rehan.raza@est.tech>
17 files changed:
datafile-app-server/pom.xml
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/main/resources/logback-spring.xml
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java [new file with mode: 0644]
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java

index ace0389..59f8f3b 100644 (file)
       <artifactId>testng</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
 
     <!--REQUIRED TO GENERATE DOCUMENTATION -->
     <dependency>
index 9838afb..208e8a4 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
 import com.google.gson.JsonObject;
-
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -35,7 +33,6 @@ import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
 import org.springframework.scheduling.annotation.EnableScheduling;
-
 import reactor.core.publisher.Flux;
 import reactor.core.scheduler.Schedulers;
 
@@ -64,8 +61,8 @@ public class CloudConfiguration extends AppConfig {
     }
 
 
-    protected void runTask() {
-        Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)).subscribeOn(Schedulers.parallel())
+    protected void runTask(Map<String, String> contextMap) {
+        Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment, contextMap)).subscribeOn(Schedulers.parallel())
                 .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
     }
 
index 106725c..8f41726 100644 (file)
@@ -1,7 +1,7 @@
 /*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * =================================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END==========================================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
 import org.slf4j.Logger;
@@ -38,7 +40,8 @@ class EnvironmentProcessor {
     private EnvironmentProcessor() {
     }
 
-    static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+    static Mono<EnvProperties> evaluate(Properties systemEnvironment, Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.info("Loading configuration from system environment variables");
         EnvProperties envProperties;
         try {
index bc21f96..b4dc635 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
-
 import javax.annotation.PostConstruct;
-
+import org.apache.commons.lang3.StringUtils;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
-
 import io.swagger.annotations.ApiOperation;
 import reactor.core.publisher.Mono;
 
@@ -45,7 +53,11 @@ public class SchedulerConfig {
     private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
     private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
     private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
+    private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
+    private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
+    private static final Marker EXIT = MarkerFactory.getMarker("EXIT");
     private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
+    private Map<String, String> contextMap;
 
     private final TaskScheduler taskScheduler;
     private final ScheduledTasks scheduledTask;
@@ -68,6 +80,9 @@ public class SchedulerConfig {
     public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
         scheduledFutureList.forEach(x -> x.cancel(false));
         scheduledFutureList.clear();
+        MdcVariables.setMdcContextMap(contextMap);
+        logger.info(EXIT, "Stopped Datafile workflow");
+        MDC.clear();
         return Mono.defer(() -> Mono
             .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
     }
@@ -80,10 +95,20 @@ public class SchedulerConfig {
     @PostConstruct
     @ApiOperation(value = "Start task if possible")
     public synchronized boolean tryToStartTask() {
+        String requestId = MDC.get(REQUEST_ID);
+        if (StringUtils.isBlank(requestId)) {
+            MDC.put(REQUEST_ID, UUID.randomUUID().toString());
+        }
+        String invocationId = MDC.get(INVOCATION_ID);
+        if (StringUtils.isBlank(invocationId)) {
+            MDC.put(INVOCATION_ID, UUID.randomUUID().toString());
+        }
+        contextMap = MDC.getCopyOfContextMap();
+        logger.info(ENTRY, "Start scheduling Datafile workflow");
         if (scheduledFutureList.isEmpty()) {
-            scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+            scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
                     SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
-            scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
+            scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
                     SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
             scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
                    SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
@@ -94,4 +119,4 @@ public class SchedulerConfig {
         }
 
     }
-}
+}
\ No newline at end of file
index 825308e..a21ed04 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.controllers;
 
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
 import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestHeader;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
-
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import reactor.core.publisher.Mono;
@@ -50,9 +58,24 @@ public class ScheduleController {
         this.schedulerConfig = schedulerConfig;
     }
 
+    public Mono<ResponseEntity<String>> startTasks() {
+        logger.trace("Start scheduling worker request");
+        return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
+    }
+
     @RequestMapping(value = "start", method = RequestMethod.GET)
     @ApiOperation(value = "Start scheduling worker request")
-    public Mono<ResponseEntity<String>> startTasks() {
+    public Mono<ResponseEntity<String>> startTasks(@RequestHeader HttpHeaders headers) {
+        String requestId = headers.getFirst(X_ONAP_REQUEST_ID);
+        if (StringUtils.isBlank(requestId)) {
+            requestId = UUID.randomUUID().toString();
+        }
+        String invocationId = headers.getFirst(X_INVOCATION_ID);
+        if (StringUtils.isBlank(invocationId)) {
+            invocationId = UUID.randomUUID().toString();
+        }
+        MDC.put(REQUEST_ID, requestId);
+        MDC.put(INVOCATION_ID, invocationId);
         logger.trace("Receiving start scheduling worker request");
         return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
     }
index 4c0dcce..57edc36 100644 (file)
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import java.time.Duration;
-
+import java.util.Map;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpStatus;
-
 import reactor.core.publisher.Mono;
 
 /**
@@ -50,21 +50,24 @@ public class DataRouterPublisher {
      * @param firstBackoffTimeout the time to delay the first retry
      * @return the HTTP response status as a string
      */
-    public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+    public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
+            Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.trace("Method called with arg {}", model);
         DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
 
         //@formatter:off
         return Mono.just(model)
                 .cache()
-                .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
-                .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+                .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
+                .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
                 .retryBackoff(numRetries, firstBackoff);
         //@formatter:on
     }
 
-    private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
-
+    private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
+            Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         if (HttpUtils.isSuccessfulResponseCode(response.value())) {
             logger.trace("Publish to DR successful!");
             return Mono.just(model);
index 0b647bf..a002031 100644 (file)
@@ -18,7 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import java.nio.file.Path;
 import java.time.Duration;
-
+import java.util.Map;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -29,9 +29,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import reactor.core.publisher.Mono;
 
 /**
@@ -52,14 +52,15 @@ public class FileCollector {
     }
 
     public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
-            Duration firstBackoffTimeout) {
+            Duration firstBackoffTimeout, Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.trace("Entering execute with {}", fileData);
         resolveKeyStore();
 
         //@formatter:off
         return Mono.just(fileData)
             .cache()
-            .flatMap(fd -> collectFile(fileData, metaData))
+            .flatMap(fd -> collectFile(fileData, metaData, contextMap))
             .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
         //@formatter:on
     }
@@ -76,7 +77,9 @@ public class FileCollector {
         ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
     }
 
-    private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+    private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
+            Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.trace("starting to collectFile");
 
         final String remoteFile = fileData.remoteFilePath();
index 783c699..89ebde8 100644 (file)
@@ -23,8 +23,8 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
@@ -32,12 +32,13 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
@@ -86,22 +87,24 @@ public class ScheduledTasks {
     /**
      * Main function for scheduling for the file collection Workflow.
      */
-    public void scheduleMainDatafileEventTask() {
+    public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.trace("Execution of tasks was registered");
         applicationConfiguration.initFileStreamReader();
-        createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+        createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
+                () -> onComplete(contextMap));
     }
 
-    Flux<ConsumerDmaapModel> createMainTask() {
+    Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) {
         return fetchMoreFileReadyMessages() //
                 .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
                 .runOn(scheduler) //
                 .flatMap(this::createFileCollectionTask) //
                 .filter(this::shouldBePublished) //
                 .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
-                .flatMap(this::collectFileFromXnf) //
-                .flatMap(this::publishToDataRouter) //
-                .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+                .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
+                .flatMap(model -> publishToDataRouter(model, contextMap)) //
+                .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
                 .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
                 .sequential();
     }
@@ -113,15 +116,18 @@ public class ScheduledTasks {
         alreadyPublishedFiles.purge(now);
     }
 
-    private void onComplete() {
+    private void onComplete(Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.info("Datafile tasks have been completed");
     }
 
-    private void onSuccess(ConsumerDmaapModel model) {
+    private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.info("Datafile consumed tasks {}", model.getInternalLocation());
     }
 
-    private void onError(Throwable throwable) {
+    private void onError(Throwable throwable, Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
     }
 
@@ -147,38 +153,45 @@ public class ScheduledTasks {
         return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
     }
 
-    private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+    private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
+            Map<String, String> contextMap) {
         final long maxNUmberOfRetries = 3;
         final Duration initialRetryTimeout = Duration.ofSeconds(5);
 
+        MdcVariables.setMdcContextMap(contextMap);
         return fileCollect.collectorTask
-                .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
-                .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
+                .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
+                        contextMap)
+                .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
     }
 
-    private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+    private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         Path localFileName = fileData.getLocalFileName();
         logger.error("File fetching failed: {}", localFileName);
-        deleteFile(localFileName);
+        deleteFile(localFileName, contextMap);
         alreadyPublishedFiles.remove(localFileName);
         currentNumberOfTasks.decrementAndGet();
         return Mono.empty();
     }
 
-    private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+    private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) {
         final long maxNumberOfRetries = 3;
         final Duration initialRetryTimeout = Duration.ofSeconds(5);
 
         DataRouterPublisher publisherTask = createDataRouterPublisher();
 
-        return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
-                .onErrorResume(exception -> handlePublishFailure(model, exception));
+        MdcVariables.setMdcContextMap(contextMap);
+        return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+                .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
     }
 
-    private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+    private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception,
+            Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
         Path internalFileName = Paths.get(model.getInternalLocation());
-        deleteFile(internalFileName);
+        deleteFile(internalFileName, contextMap);
         alreadyPublishedFiles.remove(internalFileName);
         currentNumberOfTasks.decrementAndGet();
         return Mono.empty();
@@ -193,17 +206,20 @@ public class ScheduledTasks {
             return Flux.empty();
         }
 
+        Map<String, String> contextMap = MDC.getCopyOfContextMap();
         return createConsumerTask() //
                 .execute() //
-                .onErrorResume(this::handleConsumeMessageFailure);
+                .onErrorResume(exception -> handleConsumeMessageFailure(exception, contextMap));
     }
 
-    private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
-        logger.error("Polling for file ready message filed, exception: {}", exception);
+    private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
+        logger.error("Polling for file ready message failed, exception: {}", exception);
         return Flux.empty();
     }
 
-    private void deleteFile(Path localFile) {
+    private void deleteFile(Path localFile, Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.trace("Deleting file: {}", localFile);
         try {
             Files.delete(localFile);
index af4ab18..1b9818d 100644 (file)
@@ -1,19 +1,39 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration>
   <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
-  <property name="LOG_FILE"
-    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}spring.log}"/>
+  <property name="outputFilename" value="application"/>
+  <property name="logPath" value="/var/log/ONAP"/>
+  <property name="maxFileSize" value="50MB"/>
+  <property name="maxHistory" value="30"/>
+  <property name="totalSizeCap" value="10GB"/>
+  <property name="defaultPattern" value="%nopexception%logger
+    |%date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
+    |%level
+    |%replace(%replace(%message){'\t','\\\\t'}){'\n','\\\\n'}
+    |%replace(%replace(%mdc){'\t','\\\\t'}){'\n','\\\\n'}
+    |%replace(%replace(%rootException){'\t','\\\\t'}){'\n','\\\\n'}
+    |%replace(%replace(%marker){'\t','\\\\t'}){'\n','\\\\n'}
+    |%thread
+    |%n"/>
 
   <springProfile name="dev">
-    <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
+    <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender">
+       <encoder>
+         <pattern>${defaultPattern}</pattern>
+       </encoder>
+    </appender>
+
     <appender name="ROLLING-FILE"
       class="ch.qos.logback.core.rolling.RollingFileAppender">
       <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
+        <pattern>${defaultPattern}</pattern>
       </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+      <file>${logPath}/${outputFilename}.log</file>
+      <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+        <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+        <MaxFileSize>${maxFileSize}</MaxFileSize>
+        <MaxHistory>${maxHistory}</MaxHistory>
+        <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
       </rollingPolicy>
     </appender>
     <root level="ERROR">
     <appender name="ROLLING-FILE"
       class="ch.qos.logback.core.rolling.RollingFileAppender">
       <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
+        <pattern>${defaultPattern}</pattern>
       </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
-        <timeBasedFileNamingAndTriggeringPolicy
-          class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-          <maxFileSize>10MB</maxFileSize>
-        </timeBasedFileNamingAndTriggeringPolicy>
+      <file>${logPath}/${outputFilename}.log</file>
+      <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+        <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
+        <MaxFileSize>${maxFileSize}</MaxFileSize>
+        <MaxHistory>${maxHistory}</MaxHistory>
+        <TotalSizeCap>${totalSizeCap}</TotalSizeCap>
       </rollingPolicy>
     </appender>
 
index efb762a..f41ecf2 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.integration;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.verify;
-
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoExtension;
@@ -57,6 +56,6 @@ class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
     }
 
     private void verifyDmaapConsumerTask() {
-        verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask();
+        verify(scheduledTask, atLeast(1)).scheduleMainDatafileEventTask(any());
     }
 }
index 24b82fe..d612d17 100644 (file)
@@ -17,6 +17,7 @@
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -24,9 +25,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
-
 import java.time.Duration;
-
+import java.util.HashMap;
+import java.util.Map;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -36,7 +37,6 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
 import org.springframework.http.HttpStatus;
-
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -99,10 +99,11 @@ class DataRouterPublisherTest {
     public void whenPassedObjectFits_ReturnsCorrectStatus() {
         prepareMocksForTests(Mono.just(HttpStatus.OK));
 
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+        Map<String, String> contextMap = new HashMap<>();
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
                 .expectNext(consumerDmaapModel).verifyComplete();
 
-        verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
+        verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap));
         verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
 
@@ -110,10 +111,11 @@ class DataRouterPublisherTest {
     public void whenPassedObjectFits_firstFailsThenSucceeds() {
         prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
 
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+        Map<String, String> contextMap = new HashMap<>();
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
                 .expectNext(consumerDmaapModel).verifyComplete();
 
-        verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+        verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
         verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
 
@@ -121,17 +123,18 @@ class DataRouterPublisherTest {
     public void whenPassedObjectFits_firstFailsThenFails() {
         prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
 
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+        Map<String, String> contextMap = new HashMap<>();
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
                 .expectErrorMessage("Retries exhausted: 1/1").verify();
 
-        verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+        verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
         verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
 
     @SafeVarargs
     final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
         dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
-        when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
+        when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse,
                 nextHttpResponses);
 
         dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
index 0662216..8a572be 100644 (file)
@@ -26,11 +26,9 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
-
 import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
-
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -45,7 +43,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -161,7 +158,7 @@ public class ScheduledTasksTest {
         doReturn(consumerMock).when(testedObject).createConsumerTask();
         doReturn(Flux.empty()).when(consumerMock).execute();
 
-        testedObject.scheduleMainDatafileEventTask();
+        testedObject.scheduleMainDatafileEventTask(any());
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).execute();
@@ -178,18 +175,18 @@ public class ScheduledTasksTest {
         doReturn(fileReadyMessages).when(consumerMock).execute();
 
         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
-        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
 
-        StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+        StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
                 .expectNextCount(noOfFiles) //
                 .expectComplete() //
                 .verify(); //
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).execute();
-        verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
-        verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
+        verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+        verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any());
         verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);
         verifyNoMoreInteractions(consumerMock);
@@ -206,20 +203,20 @@ public class ScheduledTasksTest {
         // First file collect will fail, 3 will succeed
         doReturn(error, collectedFile, collectedFile, collectedFile) //
                 .when(fileCollectorMock) //
-                .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
+                .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any());
 
-        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
+        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
 
-        StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+        StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
                 .expectNextCount(3) //
                 .expectComplete() //
                 .verify(); //
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).execute();
-        verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
-        verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
+        verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+        verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any());
         verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);
         verifyNoMoreInteractions(consumerMock);
@@ -232,23 +229,23 @@ public class ScheduledTasksTest {
         doReturn(fileReadyMessages).when(consumerMock).execute();
 
         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
-        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
 
         Mono<Object> error = Mono.error(new Exception("problem"));
         // One publish will fail, the rest will succeed
         doReturn(collectedFile, error, collectedFile, collectedFile) //
                 .when(dataRouterMock) //
-                .execute(notNull(), anyLong(), notNull());
+                .execute(notNull(), anyLong(), notNull(), any());
 
-        StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+        StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
                 .expectNextCount(3) // 3 completed files
                 .expectComplete() //
                 .verify(); //
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).execute();
-        verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
-        verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
+        verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+        verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any());
         verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);
         verifyNoMoreInteractions(consumerMock);
@@ -264,18 +261,18 @@ public class ScheduledTasksTest {
         doReturn(fileReadyMessages).when(consumerMock).execute();
 
         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
-        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+        doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
+        doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
 
-        StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+        StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() //
                 .expectNextCount(1) // 99 is skipped
                 .expectComplete() //
                 .verify(); //
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         verify(consumerMock, times(1)).execute();
-        verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
-        verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
+        verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any());
+        verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any());
         verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);
         verifyNoMoreInteractions(consumerMock);
index 804b46e..b5d3c15 100644 (file)
@@ -22,10 +22,10 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
-
 import java.nio.file.Path;
 import java.time.Duration;
-
+import java.util.HashMap;
+import java.util.Map;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,7 +40,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-
 import reactor.test.StepVerifier;
 
 /**
@@ -155,7 +154,8 @@ public class XnfCollectorTaskImplTest {
 
         ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
 
-        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+        Map<String, String> contextMap = new HashMap<>();
+        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
                 .expectNext(expectedConsumerDmaapModel).verifyComplete();
 
         verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -196,7 +196,8 @@ public class XnfCollectorTaskImplTest {
                 .build();
         // @formatter:on
 
-        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+        Map<String, String> contextMap = new HashMap<>();
+        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
                 .expectNext(expectedConsumerDmaapModel).verifyComplete();
 
         verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -211,7 +212,8 @@ public class XnfCollectorTaskImplTest {
         doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
                 .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
-        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+        Map<String, String> contextMap = new HashMap<>();
+        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
                 .expectErrorMessage("Retries exhausted: 3/3").verify();
 
         verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -227,7 +229,8 @@ public class XnfCollectorTaskImplTest {
         ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
 
         FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
-        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+        Map<String, String> contextMap = new HashMap<>();
+        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
                 .expectNext(expectedConsumerDmaapModel).verifyComplete();
 
         verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/logging/MdcVariables.java
new file mode 100644 (file)
index 0000000..9d88206
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model.logging;
+
+import java.util.Map;
+import org.slf4j.MDC;
+
+public final class MdcVariables {
+
+    public static final String X_ONAP_REQUEST_ID = "X-ONAP-RequestID";
+    public static final String X_INVOCATION_ID = "X-InvocationID";
+    public static final String REQUEST_ID = "RequestID";
+    public static final String INVOCATION_ID = "InvocationID";
+    public static final String INSTANCE_ID = "InstanceID";
+    public static final String RESPONSE_CODE = "ResponseCode";
+    public static final String SERVICE_NAME = "ServiceName";
+
+    private MdcVariables() {
+    }
+
+    public static void setMdcContextMap(Map<String, String> mdcContextMap) {
+        if (mdcContextMap != null) {
+            MDC.setContextMap(mdcContextMap);
+        }
+    }
+}
index e99b811..21266fb 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.service;
 
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.RESPONSE_CODE;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.SERVICE_NAME;
 import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
-
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 import org.springframework.http.HttpHeaders;
 import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClient.Builder;
-
 import reactor.core.publisher.Mono;
 
 /**
@@ -68,17 +69,21 @@ public class DmaapReactiveWebClient {
 
     private ExchangeFilterFunction logResponse() {
         return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+            MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
             logger.trace("Response Status {}", clientResponse.statusCode());
+            MDC.remove(RESPONSE_CODE);
             return Mono.just(clientResponse);
         });
     }
 
     private ExchangeFilterFunction logRequest() {
         return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+            MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
             logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
             clientRequest.headers()
                     .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
             logger.trace("HTTP request headers: {}", clientRequest.headers());
+            MDC.remove(SERVICE_NAME);
             return Mono.just(clientRequest);
         });
     }
index 4869e4c..f80fcd0 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.service.producer;
 
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -28,10 +30,10 @@ import java.nio.file.Paths;
 import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.Future;
-
 import javax.net.ssl.SSLContext;
-
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
@@ -45,14 +47,17 @@ import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.util.DefaultUriBuilderFactory;
-
 import reactor.core.publisher.Mono;
 
 /**
@@ -66,6 +71,8 @@ public class DmaapProducerReactiveHttpClient {
     private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
     private static final String URI_SEPARATOR = "/";
     private static final String DEFAULT_FEED_ID = "1";
+    private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+    private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -101,11 +108,11 @@ public class DmaapProducerReactiveHttpClient {
      * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
      * @return status code of operation
      */
-    public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+    public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel,
+            Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
         logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
         try {
-            logger.trace("Starting to publish to DR {}",  consumerDmaapModel.getInternalLocation());
-
             webClient = getWebClient();
             webClient.start();
 
@@ -114,9 +121,10 @@ public class DmaapProducerReactiveHttpClient {
             prepareBody(consumerDmaapModel, put);
             addUserCredentialsToHead(put);
 
+            logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
             Future<HttpResponse> future = webClient.execute(put, null);
             HttpResponse response = future.get();
-            logger.trace("{}", response);
+            logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
             webClient.close();
             return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
         } catch (Exception e) {
@@ -141,6 +149,11 @@ public class DmaapProducerReactiveHttpClient {
         metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
         put.addHeader(X_DMAAP_DR_META, metaData.toString());
         put.setURI(getUri(name));
+
+        String requestID = MDC.get(REQUEST_ID);
+        put.addHeader(X_ONAP_REQUEST_ID, requestID);
+        String invocationID = UUID.randomUUID().toString();
+        put.addHeader(X_INVOCATION_ID, invocationID);
     }
 
     private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
index a0d3673..2bbe8e1 100644 (file)
@@ -20,19 +20,18 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
@@ -51,7 +50,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.util.DefaultUriBuilderFactory;
-
 import reactor.test.StepVerifier;
 
 /**
@@ -142,7 +140,8 @@ class DmaapProducerReactiveHttpClientTest {
         httpPut.addHeader("Authorization", "Basic " + base64Creds);
 
         fileStream.reset();
-        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+        Map<String, String> contextMap = new HashMap<>();
+        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap))
         .expectNext(HttpStatus.OK).verifyComplete();
 
         verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
@@ -153,7 +152,7 @@ class DmaapProducerReactiveHttpClientTest {
     @Test
     void getHttpResponse_Fail() throws Exception {
         mockWebClientDependantObject(false);
-        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, any()))
         .expectError()
         .verify();
     }