DFC - housekeeping 75/90575/3
authorelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 2 Jul 2019 13:53:56 +0000 (13:53 +0000)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 2 Jul 2019 13:53:56 +0000 (13:53 +0000)
Removed Sonar warnings.
Improved code coverage.
Improved code formatting

Change-Id: I0e8287cecb18a8976635427697dfaeb823da204b
Issue-ID: DCAEGEN2-1645
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
22 files changed:
datafile-app-server/pom.xml
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java [new file with mode: 0644]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java [new file with mode: 0644]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java [moved from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java with 87% similarity]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java

index 9f2a12c..373d4f0 100644 (file)
       <groupId>io.springfox</groupId>
       <artifactId>springfox-swagger-ui</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-actuator</artifactId>
-    </dependency>
   </dependencies>
 
   <build>
index 8e15deb..e9d8464 100644 (file)
@@ -22,7 +22,6 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
 import com.google.gson.TypeAdapterFactory;
-
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -32,10 +31,8 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Properties;
 import java.util.ServiceLoader;
-
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
-
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
@@ -49,7 +46,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.stereotype.Component;
-
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -69,12 +65,12 @@ public class AppConfig {
     private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
 
     private ConsumerConfiguration dmaapConsumerConfiguration;
-    private Map<String, PublisherConfiguration> publishingConfiguration;
+    Map<String, PublisherConfiguration> publishingConfigurations;
     private FtpesConfig ftpesConfiguration;
     private CloudConfigurationProvider cloudConfigurationProvider;
     @Value("#{systemEnvironment}")
     Properties systemEnvironment;
-    private Disposable refreshConfigTask = null;
+    Disposable refreshConfigTask = null;
 
     @NotEmpty
     private String filepath;
@@ -104,6 +100,9 @@ public class AppConfig {
                 () -> logger.error("Configuration refresh terminated"));
     }
 
+    /**
+     * Stops the refreshing of the configuration.
+     */
     public void stop() {
         if (refreshConfigTask != null) {
             refreshConfigTask.dispose();
@@ -115,17 +114,33 @@ public class AppConfig {
         return dmaapConsumerConfiguration;
     }
 
+    /**
+     * Checks if there is a configuration for the given feed.
+     *
+     * @param changeIdentifier the change identifier the feed is configured to belong to.
+     *
+     * @return true if a feed is configured for the given change identifier, false if not.
+     */
     public synchronized boolean isFeedConfigured(String changeIdentifier) {
-        return publishingConfiguration.containsKey(changeIdentifier);
+        return publishingConfigurations.containsKey(changeIdentifier);
     }
 
+    /**
+     * Gets the feed configuration for the given change identifier.
+     *
+     * @param changeIdentifier the change identifier the feed is configured to belong to.
+     * @return the <code>PublisherConfiguration</code> for the feed belonging to the given change identifier.
+     *
+     * @throws DatafileTaskException if no configuration has been loaded or the configuration is missing for the given
+     *         change identifier.
+     */
     public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
         throws DatafileTaskException {
 
-        if (publishingConfiguration == null) {
+        if (publishingConfigurations == null) {
             throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
         }
-        PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier);
+        PublisherConfiguration cfg = publishingConfigurations.get(changeIdentifier);
         if (cfg == null) {
             throw new DatafileTaskException(
                 "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
@@ -184,7 +199,7 @@ public class AppConfig {
     private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
         try {
             CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
-            setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(),
+            setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
                 parser.getFtpesConfig());
         } catch (DatafileTaskException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
@@ -212,14 +227,13 @@ public class AppConfig {
     }
 
     private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
-        Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
-        if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
-            logger.error(
-                "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
-                consumerConfiguration, publisherConfiguration, ftpesConfig);
+        Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) {
+        if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) {
+            logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}",
+                consumerConfiguration, publisherConfigurations, ftpesConfig);
         } else {
             this.dmaapConsumerConfiguration = consumerConfiguration;
-            this.publishingConfiguration = publisherConfiguration;
+            this.publishingConfigurations = publisherConfigurations;
             this.ftpesConfiguration = ftpesConfig;
         }
     }
index d25d7db..0242bef 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,13 +21,11 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
 /**
@@ -51,7 +49,14 @@ public class CloudConfigParser {
         this.dmaapConfigurationRoot = dmaapConfigurationRoot;
     }
 
-    public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException {
+    /**
+     * Get the publisher configurations.
+     *
+     * @return a map with change identifier as key and the connected publisher configuration as value.
+     *
+     * @throws DatafileTaskException if a member of the configuration is missing.
+     */
+    public Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
         Iterator<JsonElement> producerCfgs =
             toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
 
@@ -81,6 +86,12 @@ public class CloudConfigParser {
         return result;
     }
 
+    /**
+     * Get the consumer configuration.
+     *
+     * @return the consumer configuration.
+     * @throws DatafileTaskException if a member of the configuration is missing.
+     */
     public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
         JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
         Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
@@ -100,6 +111,12 @@ public class CloudConfigParser {
             .build();
     }
 
+    /**
+     * Get the security configuration for communication with the xNF.
+     *
+     * @return the xNF communication security configuration.
+     * @throws DatafileTaskException if a member of the configuration is missing.
+     */
     public FtpesConfig getFtpesConfig() throws DatafileTaskException {
         return new ImmutableFtpesConfig.Builder() //
             .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
index 1fd24e9..e62a11e 100644 (file)
@@ -18,7 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
 
 import java.net.MalformedURLException;
 import java.net.URL;
-
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -42,6 +41,13 @@ public abstract class ConsumerConfiguration {
 
     public abstract Boolean enableDmaapCertAuth();
 
+    /**
+     * Gets the configuration in the SDK version.
+     *
+     * @return a <code>DmaapConsumerConfiguration</code> representing the configuration.
+     *
+     * @throws DatafileTaskException if something is wrong with the topic URL.
+     */
     public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
         try {
             URL url = new URL(topicUrl());
@@ -91,16 +97,14 @@ public abstract class ConsumerConfiguration {
     }
 
     private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
-        String[] tokens = urlPath.split("/"); // UrlPath:
-                                              // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
+        String[] tokens = urlPath.split("/"); // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
         if (tokens.length != 5) {
             throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
         }
 
-        final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // e.g.
-                                                                   // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
-        final String consumerGroup = tokens[3]; // ex. OpenDcae-c12
-        final String consumerId = tokens[4]; // ex. C12
+        final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
+        final String consumerGroup = tokens[3]; // OpenDcae-c12
+        final String consumerId = tokens[4]; // C12
         return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
     }
 
index 3c3a762..7a84524 100644 (file)
@@ -18,7 +18,6 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
 
 import java.net.MalformedURLException;
 import java.net.URL;
-
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -50,6 +49,12 @@ public interface PublisherConfiguration {
 
     String changeIdentifier();
 
+    /**
+     * Get the publisher configuration in SDK format.
+     *
+     * @return a <code>DmaapPublisherConfiguration</code> contining the publisher configuration.
+     * @throws MalformedURLException if the publish URL is malformed.
+     */
     default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
         URL url = new URL(publishUrl());
         String urlPath = url.getPath();
@@ -69,5 +74,4 @@ public interface PublisherConfiguration {
             .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
             .build();
     }
-
 }
index e11cd76..bdedba4 100644 (file)
@@ -22,10 +22,8 @@ import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import com.jcraft.jsch.SftpException;
-
 import java.nio.file.Path;
 import java.util.Optional;
-
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.slf4j.Logger;
@@ -97,7 +95,7 @@ public class SftpClient implements FileCollectClient {
                 throw new DatafileTaskException("Could not open Sftp client. " + e);
             } else {
                 throw new NonRetryableDatafileTaskException(
-                    "Could not open Sftp client, no retry attempts will be done " + e);
+                    "Could not open Sftp client, no retry attempts will be done. " + e);
             }
         }
     }
index 4c42284..878bb55 100644 (file)
@@ -24,7 +24,6 @@ import java.time.Instant;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- *
  * Various counters that can be shown via a REST API.
  *
  */
@@ -80,6 +79,7 @@ public class Counters {
         noOfFailedPublish++;
     }
 
+    @Override
     public synchronized String toString() {
         StringBuilder str = new StringBuilder();
         str.append(format("totalReceivedEvents", totalReceivedEvents));
@@ -104,4 +104,49 @@ public class Counters {
         String header = name + ":";
         return String.format("%-24s%-22s\n", header, value);
     }
+
+    public int getNoOfCollectedFiles() {
+        return noOfCollectedFiles;
+    }
+
+    public int getNoOfFailedFtpAttempts() {
+        return noOfFailedFtpAttempts;
+    }
+
+    public int getNoOfFailedFtp() {
+        return noOfFailedFtp;
+    }
+
+    public int getNoOfFailedPublishAttempts() {
+        return noOfFailedPublishAttempts;
+    }
+
+    public int getTotalPublishedFiles() {
+        return totalPublishedFiles;
+    }
+
+    public int getNoOfFailedPublish() {
+        return noOfFailedPublish;
+    }
+
+    public int getTotalReceivedEvents() {
+        return totalReceivedEvents;
+    }
+
+    /**
+     * Resets all data.
+     */
+    public void clear() {
+        numberOfTasks.set(0);
+        numberOfSubscriptions.set(0);
+        noOfCollectedFiles = 0;
+        noOfFailedFtpAttempts = 0;
+        noOfFailedFtp = 0;
+        noOfFailedPublishAttempts = 0;
+        totalPublishedFiles = 0;
+        noOfFailedPublish = 0;
+        lastPublishedTime = Instant.MIN;
+        totalReceivedEvents = 0;
+        lastEventTime = Instant.MIN;
+    }
 }
index 2ad96e8..5a8806b 100644 (file)
@@ -18,13 +18,14 @@ package org.onap.dcaegen2.collectors.datafile.service;
 
 import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
 import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
 
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClient.Builder;
@@ -37,11 +38,9 @@ import reactor.core.publisher.Mono;
  */
 public class DmaapWebClient {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger logger = LoggerFactory.getLogger(DmaapWebClient.class);
 
     private String contentType;
-    private String dmaapUserName;
-    private String dmaapUserPassword;
 
     /**
      * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
@@ -62,33 +61,31 @@ public class DmaapWebClient {
     public WebClient build() {
         Builder webClientBuilder = WebClient.builder() //
             .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) //
-            .filter(logRequest()) //
-            .filter(logResponse());
-        if (dmaapUserName != null && !dmaapUserName.isEmpty() && dmaapUserPassword != null
-            && !dmaapUserPassword.isEmpty()) {
-            webClientBuilder.filter(basicAuthentication(dmaapUserName, dmaapUserPassword));
-        }
+            .filter(getRequestFilter()) //
+            .filter(getResponseFilter());
         return webClientBuilder.build();
     }
 
-    private ExchangeFilterFunction logResponse() {
-        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
-            MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
-            logger.trace("Response Status {}", clientResponse.statusCode());
-            MDC.remove(RESPONSE_CODE);
-            return Mono.just(clientResponse);
-        });
+    private ExchangeFilterFunction getResponseFilter() {
+        return ExchangeFilterFunction.ofResponseProcessor(this::logResponse);
     }
 
-    private ExchangeFilterFunction logRequest() {
-        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
-            MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
-            logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
-            clientRequest.headers()
-                .forEach((name, values) -> values.forEach(value -> logger.trace("{}={}", name, value)));
-            logger.trace("HTTP request headers: {}", clientRequest.headers());
-            MDC.remove(SERVICE_NAME);
-            return Mono.just(clientRequest);
-        });
+    Mono<ClientResponse> logResponse(ClientResponse clientResponse) {
+        MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
+        logger.trace("Response Status {}", clientResponse.statusCode());
+        MDC.remove(RESPONSE_CODE);
+        return Mono.just(clientResponse);
+    }
+
+    private ExchangeFilterFunction getRequestFilter() {
+        return ExchangeFilterFunction.ofRequestProcessor(this::logRequest);
+    }
+
+    Mono<ClientRequest> logRequest(ClientRequest clientRequest) {
+        MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
+        logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
+        logger.trace("HTTP request headers: {}", clientRequest.headers());
+        MDC.remove(SERVICE_NAME);
+        return Mono.just(clientRequest);
     }
 }
index 91b7404..a14bfd8 100644 (file)
@@ -27,7 +27,7 @@ import java.util.Map;
  * the key was last used.
  */
 public class PublishedFileCache {
-    private final Map<Path, Instant> publishedFiles = new HashMap<Path, Instant>();
+    private final Map<Path, Instant> publishedFiles = new HashMap<>();
 
     /**
      * Adds a file to the cache.
index bfd3f3e..bdec719 100644 (file)
@@ -22,14 +22,11 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
-
 import java.io.File;
-import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.nio.file.Path;
 import java.time.Duration;
-
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.ContentType;
@@ -103,7 +100,6 @@ public class DataRouterPublisher {
             HttpResponse response =
                 dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
             logger.trace("{}", response);
-            counters.incTotalPublishedFiles();
             return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
         } catch (Exception e) {
             counters.incNoOfFailedPublishAttempts();
@@ -127,26 +123,27 @@ public class DataRouterPublisher {
         MappedDiagnosticContext.appendTraceInfo(put);
     }
 
-    private void prepareBody(FilePublishInformation publishInfo, HttpPut put) throws IOException {
+    private void prepareBody(FilePublishInformation publishInfo, HttpPut put) {
         File file = createInputFile(publishInfo.getInternalLocation());
         FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY);
         put.setEntity(entity);
     }
 
-    private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response,
-        FilePublishInformation publishInfo) {
+    private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
         MDC.setContextMap(publishInfo.getContext());
         if (HttpUtils.isSuccessfulResponseCode(response.value())) {
+            counters.incTotalPublishedFiles();
             logger.trace("Publishing file {} to DR successful!", publishInfo.getName());
             return Mono.just(publishInfo);
         } else {
+            counters.incNoOfFailedPublishAttempts();
             logger.warn("Publishing file {} to DR unsuccessful. Response code: {}", publishInfo.getName(), response);
             return Mono.error(new Exception(
                 "Publishing file " + publishInfo.getName() + " to DR unsuccessful. Response code: " + response));
         }
     }
 
-    File createInputFile(Path filePath) throws IOException {
+    File createInputFile(Path filePath) {
         FileSystemResource realResource = new FileSystemResource(filePath);
         return realResource.getFile();
     }
index 20bf599..1ce64e4 100644 (file)
@@ -21,7 +21,6 @@ import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
-
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -104,15 +103,15 @@ public class FileCollector {
             currentClient.collectFile(remoteFile, localFile);
             counters.incNoOfCollectedFiles();
             return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
+        } catch (NonRetryableDatafileTaskException nre) {
+            logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), nre);
+            counters.incNoOfFailedFtpAttempts();
+            return Mono.just(Optional.empty()); // Give up
         } catch (DatafileTaskException e) {
             logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
                 e.toString());
             counters.incNoOfFailedFtpAttempts();
-            if (e instanceof NonRetryableDatafileTaskException) {
-                return Mono.just(Optional.empty()); // Give up
-            } else {
-                return Mono.error(e);
-            }
+            return Mono.error(e);
         } catch (Exception throwable) {
             logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
                 throwable.toString(), throwable);
index bc73ddb..26353e3 100644 (file)
@@ -22,7 +22,6 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
@@ -36,7 +35,6 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
@@ -222,9 +220,8 @@ public class ScheduledTasks {
         Path localFilePath = fileData.fileData.getLocalFilePath();
         if (publishedFilesCache.put(localFilePath) == null) {
             try {
-                boolean result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+                return !createPublishedChecker().isFilePublished(fileData.fileData.name(),
                     fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
-                return result;
             } catch (DatafileTaskException e) {
                 logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
                 return true; // Publish it then
index b3710fe..d9ca787 100644 (file)
@@ -19,17 +19,20 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 import com.google.gson.JsonElement;
@@ -37,16 +40,15 @@ import com.google.gson.JsonIOException;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -56,9 +58,11 @@ import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -73,7 +77,7 @@ class AppConfigTest {
 
     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
 
-    private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+    private static final DmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
         new ImmutableDmaapConsumerConfiguration.Builder() //
             .timeoutMs(-1) //
             .dmaapHostName("message-router.onap.svc.cluster.local") //
@@ -116,7 +120,7 @@ class AppConfigTest {
             .passWord("password") //
             .build();
 
-    private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+    private static final FtpesConfig CORRECT_FTPES_CONFIGURATION = //
         new ImmutableFtpesConfig.Builder() //
             .keyCert("/config/dfc.jks") //
             .keyPassword("secret") //
@@ -124,7 +128,7 @@ class AppConfigTest {
             .trustedCaPassword("secret") //
             .build();
 
-    private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+    private static final DmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
         new ImmutableDmaapPublisherConfiguration.Builder() //
             .dmaapTopicName("/publish/1") //
             .dmaapUserPassword("password") //
@@ -331,6 +335,37 @@ class AppConfigTest {
         Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
     }
 
+    @Test
+    public void whenStopSuccess() {
+        Disposable disposableMock = mock(Disposable.class);
+        appConfigUnderTest.refreshConfigTask = disposableMock;
+
+        appConfigUnderTest.stop();
+
+        verify(disposableMock).dispose();
+        verifyNoMoreInteractions(disposableMock);
+        assertNull(appConfigUnderTest.refreshConfigTask);
+    }
+
+    @Test
+    public void whenNoPublisherConfigurationThrowException() throws DatafileTaskException {
+        appConfigUnderTest.publishingConfigurations = new HashMap<>();
+
+        DatafileTaskException exception = assertThrows(DatafileTaskException.class,
+            () -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
+        assertEquals("Cannot find getPublishingConfiguration for changeIdentifier: " + CHANGE_IDENTIFIER,
+            exception.getMessage());
+    }
+
+    @Test
+    public void whenFeedIsConfiguredReturnTrue() {
+        HashMap<String, PublisherConfiguration> publishingConfigs = new HashMap<>();
+        publishingConfigs.put(CHANGE_IDENTIFIER, null);
+        appConfigUnderTest.publishingConfigurations = publishingConfigs;
+
+        assertTrue(appConfigUnderTest.isFeedConfigured(CHANGE_IDENTIFIER));
+    }
+
     private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
         JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
         return rootObject;
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java
new file mode 100644 (file)
index 0000000..bdeb1c1
--- /dev/null
@@ -0,0 +1,101 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+
+public class ConsumerConfigurationTest {
+    @Test
+    public void toDmaapSuccess() throws DatafileTaskException {
+        ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
+            .topicUrl(
+                "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
+            .trustStorePath("") //
+            .trustStorePasswordPath("") //
+            .keyStorePath("") //
+            .keyStorePasswordPath("") //
+            .enableDmaapCertAuth(Boolean.FALSE) //
+            .build();
+
+        DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap();
+        assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol());
+        assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName());
+        assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber());
+        assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup());
+        assertEquals("C12", dmaapConsumerConfiguration.consumerId());
+    }
+
+    @Test
+    public void toDmaapNoUserInfoSuccess() throws DatafileTaskException {
+        ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
+            .topicUrl(
+                "http://message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
+            .trustStorePath("") //
+            .trustStorePasswordPath("") //
+            .keyStorePath("") //
+            .keyStorePasswordPath("") //
+            .enableDmaapCertAuth(Boolean.FALSE) //
+            .build();
+
+        DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap();
+        assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol());
+        assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName());
+        assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber());
+        assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup());
+        assertEquals("C12", dmaapConsumerConfiguration.consumerId());
+    }
+
+    @Test
+    public void toDmaapWhenInvalidUrlThrowException() throws DatafileTaskException {
+        ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
+            .topicUrl("//admin:admin@message-router.onap.svc.cluster.local:2222//events/").trustStorePath("") //
+            .trustStorePasswordPath("") //
+            .keyStorePath("") //
+            .keyStorePasswordPath("") //
+            .enableDmaapCertAuth(Boolean.FALSE) //
+            .build();
+
+        DatafileTaskException exception =
+            assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap());
+        assertEquals("Could not parse the URL", exception.getMessage());
+    }
+
+    @Test
+    public void toDmaapWhenInvalidPathThrowException() throws DatafileTaskException {
+        ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() //
+            .topicUrl("http://admin:admin@message-router.onap.svc.cluster.local:2222//events/") //
+            .trustStorePath("") //
+            .trustStorePasswordPath("") //
+            .keyStorePath("") //
+            .keyStorePasswordPath("") //
+            .enableDmaapCertAuth(Boolean.FALSE) //
+            .build();
+
+        DatafileTaskException exception =
+            assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap());
+        assertEquals("The path has incorrect syntax: //events/", exception.getMessage());
+    }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java
new file mode 100644 (file)
index 0000000..b630bd0
--- /dev/null
@@ -0,0 +1,124 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.controllers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
+import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
+import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import reactor.core.publisher.Mono;
+
+@ExtendWith(MockitoExtension.class)
+public class ScheduleControllerTest {
+    @Mock
+    private SchedulerConfig schedulerConfigMock;
+
+    private ScheduleController scheduleControllerUnderTest;
+
+    @BeforeEach
+    public void setup() {
+        scheduleControllerUnderTest = new ScheduleController(schedulerConfigMock);
+    }
+
+    @Test
+    public void startTasksSuccess() {
+        when(schedulerConfigMock.tryToStartTask()).thenReturn(Boolean.TRUE);
+
+        HttpHeaders httpHeaders = new HttpHeaders();
+
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
+        Mono<ResponseEntity<String>> response = scheduleControllerUnderTest.startTasks(httpHeaders);
+
+        validateLogging(logAppender, "Start request");
+
+        String body = response.block().getBody();
+        assertTrue(body.startsWith("Datafile Service has been started!"));
+
+        assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
+        assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID)));
+    }
+
+    @Test
+    public void startTasksFail() {
+        when(schedulerConfigMock.tryToStartTask()).thenReturn(Boolean.FALSE);
+
+        HttpHeaders httpHeaders = new HttpHeaders();
+        // The following headers are set to create branch coverage in MappedDiagnosticContext:initializeTraceContext().
+        httpHeaders.set(MdcVariables.X_ONAP_REQUEST_ID, "Onap request ID");
+        httpHeaders.set(MdcVariables.X_INVOCATION_ID, "Invocation ID");
+
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
+        Mono<ResponseEntity<String>> response = scheduleControllerUnderTest.startTasks(httpHeaders);
+
+        validateLogging(logAppender, "Start request");
+
+        String body = response.block().getBody();
+        assertTrue(body.startsWith("Datafile Service is still running!"));
+
+        assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
+        assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID)));
+    }
+
+    @Test
+    public void stopTaskSuccess() {
+        when(schedulerConfigMock.getResponseFromCancellationOfTasks()).thenReturn(
+            Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+
+        HttpHeaders httpHeaders = new HttpHeaders();
+
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduleController.class);
+        Mono<ResponseEntity<String>> actualResponse = scheduleControllerUnderTest.stopTask(httpHeaders);
+
+        validateLogging(logAppender, "Stop request");
+
+        String body = actualResponse.block().getBody();
+        assertTrue(body.startsWith("Datafile Service has already been stopped!"));
+
+        assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
+        assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID)));
+    }
+
+    private void validateLogging(ListAppender<ILoggingEvent> logAppender, String infoMessage) {
+        assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
+        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
+        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
+        assertTrue("Info missing in log", logAppender.list.toString().contains("[INFO] " + infoMessage));
+        assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT");
+        logAppender.stop();
+    }
+}
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.controller;
+package org.onap.dcaegen2.collectors.datafile.controllers;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-
 import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.controllers.StatusController;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
@@ -42,15 +43,22 @@ import org.springframework.http.HttpHeaders;
 import org.springframework.http.ResponseEntity;
 import reactor.core.publisher.Mono;
 
+@ExtendWith(MockitoExtension.class)
 public class StatusControllerTest {
+    @Mock
+    ScheduledTasks scheduledTasksMock;
+
+    StatusController controllerUnderTest;
+
+    @BeforeEach
+    public void setup() {
+        controllerUnderTest = new StatusController(scheduledTasksMock);
+    }
+
     @Test
     public void heartbeat_success() {
-        ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
-
         HttpHeaders httpHeaders = new HttpHeaders();
 
-        StatusController controllerUnderTest = new StatusController(scheduledTasksMock);
-
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(StatusController.class);
         Mono<ResponseEntity<String>> result = controllerUnderTest.heartbeat(httpHeaders);
 
@@ -65,14 +73,11 @@ public class StatusControllerTest {
 
     @Test
     public void status() {
-        ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
         Counters counters = new Counters();
         doReturn(counters).when(scheduledTasksMock).getCounters();
 
         HttpHeaders httpHeaders = new HttpHeaders();
 
-        StatusController controllerUnderTest = new StatusController(scheduledTasksMock);
-
         Mono<ResponseEntity<String>> result = controllerUnderTest.status(httpHeaders);
 
         String body = result.block().getBody();
index 74d9ecd..5330a7f 100644 (file)
@@ -17,6 +17,8 @@
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -31,16 +33,15 @@ import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import com.jcraft.jsch.SftpException;
-
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Optional;
-
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 
 @ExtendWith(MockitoExtension.class)
 public class SftpClientTest {
@@ -109,7 +110,7 @@ public class SftpClientTest {
     }
 
     @Test
-    public void open_throwsException()
+    public void open_throwsExceptionWithRetry()
         throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
         FileServerData expectedFileServerData = ImmutableFileServerData.builder() //
             .serverAddress(HOST) //
@@ -123,7 +124,30 @@ public class SftpClientTest {
         doReturn(jschMock).when(sftpClientSpy).createJsch();
         when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Failed"));
 
-        assertThatThrownBy(() -> sftpClientSpy.open()).hasMessageStartingWith("Could not open Sftp client.");
+        DatafileTaskException exception = assertThrows(DatafileTaskException.class, () -> sftpClientSpy.open());
+        assertEquals("Could not open Sftp client. com.jcraft.jsch.JSchException: Failed", exception.getMessage());
+    }
+
+    @Test
+    public void openAuthFail_throwsExceptionWithoutRetry()
+        throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
+        FileServerData expectedFileServerData = ImmutableFileServerData.builder() //
+            .serverAddress(HOST) //
+            .userId(USERNAME) //
+            .password(PASSWORD) //
+            .port(SFTP_PORT) //
+            .build();
+
+        SftpClient sftpClientSpy = spy(new SftpClient(expectedFileServerData));
+
+        doReturn(jschMock).when(sftpClientSpy).createJsch();
+        when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Auth fail"));
+
+        NonRetryableDatafileTaskException exception =
+            assertThrows(NonRetryableDatafileTaskException.class, () -> sftpClientSpy.open());
+        assertEquals(
+            "Could not open Sftp client, no retry attempts will be done. com.jcraft.jsch.JSchException: Auth fail",
+            exception.getMessage());
     }
 
     @SuppressWarnings("resource")
@@ -146,7 +170,29 @@ public class SftpClientTest {
     }
 
     @Test
-    public void collectFile_throwsExceptionWithoutRetry()
+    public void collectFile_throwsExceptionWithRetry()
+        throws IOException, JSchException, SftpException, DatafileTaskException {
+        FileServerData expectedFileServerData = ImmutableFileServerData.builder() //
+            .serverAddress(HOST) //
+            .userId(USERNAME) //
+            .password(PASSWORD) //
+            .port(SFTP_PORT) //
+            .build();
+
+        try (SftpClient sftpClient = new SftpClient(expectedFileServerData)) {
+            sftpClient.sftpChannel = channelMock;
+            doThrow(new SftpException(ChannelSftp.SSH_FX_BAD_MESSAGE, "Failed")).when(channelMock).get(anyString(),
+                anyString());
+
+            assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile")))
+                .isInstanceOf(DatafileTaskException.class).hasMessageStartingWith("Unable to get file from xNF. ")
+                .hasMessageContaining("Data: FileServerData{serverAddress=" + HOST + ", " + "userId=" + USERNAME
+                    + ", password=####, port=" + SFTP_PORT);
+        }
+    }
+
+    @Test
+    public void collectFileFileMissing_throwsExceptionWithoutRetry()
         throws IOException, JSchException, SftpException, DatafileTaskException {
         FileServerData expectedFileServerData = ImmutableFileServerData.builder() //
             .serverAddress(HOST) //
@@ -161,7 +207,7 @@ public class SftpClientTest {
                 anyString());
 
             assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile")))
-                .isInstanceOf(DatafileTaskException.class)
+                .isInstanceOf(NonRetryableDatafileTaskException.class)
                 .hasMessageStartingWith("Unable to get file from xNF. No retry attempts will be done")
                 .hasMessageContaining("Data: FileServerData{serverAddress=" + HOST + ", " + "userId=" + USERNAME
                     + ", password=####, port=" + SFTP_PORT);
index 9aaca4b..1e54d29 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.service;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import org.junit.jupiter.api.BeforeEach;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import java.net.URI;
+import java.net.URISyntaxException;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
 
+@ExtendWith(MockitoExtension.class)
 class DmaapWebClientTest {
 
     @Mock
-    private DmaapConsumerConfiguration dmaapConsumerConfiguration;
+    private DmaapConsumerConfiguration dmaapConsumerConfigurationMock;
 
-    @BeforeEach
-    void setUp() {
-        dmaapConsumerConfiguration = mock(DmaapConsumerConfiguration.class);
-    }
+    @Mock
+    private ClientResponse clientResponseMock;
+
+    @Mock
+    private ClientRequest clientRequesteMock;
 
     @Test
     void buildsDMaaPReactiveWebClientProperly() {
-        when(dmaapConsumerConfiguration.dmaapContentType()).thenReturn("*/*");
-        WebClient dmaapWebClient = new DmaapWebClient() //
-            .fromConfiguration(dmaapConsumerConfiguration) //
+        when(dmaapConsumerConfigurationMock.dmaapContentType()).thenReturn("*/*");
+        WebClient dmaapWebClientUndetTest = new DmaapWebClient() //
+            .fromConfiguration(dmaapConsumerConfigurationMock) //
             .build();
 
-        verify(dmaapConsumerConfiguration, times(1)).dmaapContentType();
-        assertNotNull(dmaapWebClient);
+        verify(dmaapConsumerConfigurationMock, times(1)).dmaapContentType();
+        assertNotNull(dmaapWebClientUndetTest);
+    }
+
+    @Test
+    public void logResponseSuccess() {
+        DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
+
+        when(clientResponseMock.statusCode()).thenReturn(HttpStatus.OK);
+
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
+        Mono<ClientResponse> logResponse = dmaapWebClientUndetTest.logResponse(clientResponseMock);
+
+        assertEquals(clientResponseMock, logResponse.block());
+
+        assertEquals(logAppender.list.get(0).getLevel(), Level.TRACE);
+        assertEquals(logAppender.list.get(0).getFormattedMessage(), "Response Status 200 OK");
+
+        logAppender.stop();
+    }
+
+    @Test
+    public void logRequestSuccess() throws URISyntaxException {
+        when(clientRequesteMock.url()).thenReturn(new URI("http://test"));
+        when(clientRequesteMock.method()).thenReturn(HttpMethod.GET);
+        HttpHeaders httpHeaders = new HttpHeaders();
+        httpHeaders.add("header", "value");
+        when(clientRequesteMock.headers()).thenReturn(httpHeaders);
+
+        DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient();
+
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true);
+        Mono<ClientRequest> logRequest = dmaapWebClientUndetTest.logRequest(clientRequesteMock);
+
+        assertEquals(clientRequesteMock, logRequest.block());
+
+        assertEquals(logAppender.list.get(0).getLevel(), Level.TRACE);
+        assertEquals("Request: GET http://test", logAppender.list.get(0).getFormattedMessage());
+        assertEquals(logAppender.list.get(1).getLevel(), Level.TRACE);
+        assertEquals("HTTP request headers: [header:\"value\"]", logAppender.list.get(1).getFormattedMessage());
+
+        logAppender.stop();
     }
 }
index a585bf9..ddc279c 100644 (file)
@@ -16,8 +16,8 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.when;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-
 import java.io.File;
 import java.net.URI;
 import java.nio.file.Path;
@@ -39,13 +38,13 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -89,16 +88,17 @@ class DataRouterPublisherTest {
     private static final String PUBLISH_TOPIC = "publish";
     private static final String FEED_ID = "1";
 
+    // "https://54.45.333.2:1234/publish/1";
+    private static final String PUBLISH_URL =
+        HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
+
     private static FilePublishInformation filePublishInformation;
     private static DmaapProducerHttpClient httpClientMock;
     private static AppConfig appConfig;
     private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
     private static Map<String, String> context = new HashMap<>();
     private static DataRouterPublisher publisherTaskUnderTestSpy;
-
-    // "https://54.45.333.2:1234/publish/1";
-    private static final String PUBLISH_URL =
-        HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
+    private static final Counters counters = new Counters();
 
     @BeforeAll
     public static void setUp() {
@@ -121,7 +121,12 @@ class DataRouterPublisherTest {
             .changeIdentifier(CHANGE_IDENTIFIER) //
             .build(); //
         appConfig = mock(AppConfig.class);
-        publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, new Counters()));
+        publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, counters));
+    }
+
+    @BeforeEach
+    void setUpTest() {
+        counters.clear();
     }
 
     @Test
@@ -169,6 +174,9 @@ class DataRouterPublisherTest {
         // router.
         // This should be 10 unless the API is updated (which is the fields checked above)
         assertEquals(10, metaHash.size());
+
+        assertEquals("totalPublishedFiles should have been 1", 1, counters.getTotalPublishedFiles());
+        assertEquals("noOfFailedPublishAttempts should have been 0", 0, counters.getNoOfFailedPublishAttempts());
     }
 
     @Test
@@ -182,6 +190,9 @@ class DataRouterPublisherTest {
 
         assertTrue("Warning missing in log",
             logAppender.list.toString().contains("[WARN] Publishing file " + PM_FILE_NAME + " to DR unsuccessful."));
+
+        assertEquals("totalPublishedFiles should have been 1", 1, counters.getTotalPublishedFiles());
+        assertEquals("noOfFailedPublishAttempts should have been 1", 1, counters.getNoOfFailedPublishAttempts());
     }
 
     @Test
@@ -197,6 +208,9 @@ class DataRouterPublisherTest {
         verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
         verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
         verifyNoMoreInteractions(httpClientMock);
+
+        assertEquals("totalPublishedFiles should have been 1", 1, counters.getTotalPublishedFiles());
+        assertEquals("noOfFailedPublishAttempts should have been 1", 1, counters.getNoOfFailedPublishAttempts());
     }
 
     @Test
@@ -215,6 +229,9 @@ class DataRouterPublisherTest {
         verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
         verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
         verifyNoMoreInteractions(httpClientMock);
+
+        assertEquals("totalPublishedFiles should have been 0", 0, counters.getTotalPublishedFiles());
+        assertEquals("noOfFailedPublishAttempts should have been 2", 2, counters.getNoOfFailedPublishAttempts());
     }
 
     @SafeVarargs
index 1a3b205..93f2007 100644 (file)
@@ -16,6 +16,7 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -31,8 +32,8 @@ import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
@@ -140,11 +141,8 @@ public class FileCollectorTest {
             .build();
     }
 
-    /**
-     * Sets up the configuration.
-     */
     @BeforeAll
-    public static void setUpConfiguration() {
+    static void setUpConfiguration() {
         when(appConfigMock.getFtpesConfiguration()).thenReturn(ftpesConfigMock);
         when(ftpesConfigMock.keyCert()).thenReturn(FTP_KEY_PATH);
         when(ftpesConfigMock.keyPassword()).thenReturn(FTP_KEY_PASSWORD);
@@ -152,6 +150,11 @@ public class FileCollectorTest {
         when(ftpesConfigMock.trustedCaPassword()).thenReturn(TRUSTED_CA_PASSWORD);
     }
 
+    @BeforeEach
+    void setUpTest() {
+        counters.clear();
+    }
+
     @Test
     public void whenFtpesFile_returnCorrectResponse() throws Exception {
         FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
@@ -169,8 +172,10 @@ public class FileCollectorTest {
         verify(ftpsClientMock, times(1)).open();
         verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
         verify(ftpsClientMock, times(1)).close();
-
         verifyNoMoreInteractions(ftpsClientMock);
+
+        assertEquals("collectedFiles should have been 1", counters.getNoOfCollectedFiles(), 1);
+        assertEquals("failedFtpAttempts should have been 0", counters.getNoOfFailedFtpAttempts(), 0);
     }
 
     @Test
@@ -198,6 +203,8 @@ public class FileCollectorTest {
         verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
         verify(sftpClientMock, times(2)).close();
         verifyNoMoreInteractions(sftpClientMock);
+
+        assertEquals("collectedFiles should have been 2", counters.getNoOfCollectedFiles(), 2);
     }
 
     @Test
@@ -214,11 +221,14 @@ public class FileCollectorTest {
             .verify();
 
         verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+        assertEquals("collectedFiles should have been 0", counters.getNoOfCollectedFiles(), 0);
+        assertEquals("failedFtpAttempts should have been 4", counters.getNoOfFailedFtpAttempts(), 4);
     }
 
     @Test
     public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception {
-        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, new Counters()));
+        FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
         doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any());
 
         FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS);
@@ -230,6 +240,9 @@ public class FileCollectorTest {
             .verify();
 
         verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+        assertEquals("collectedFiles should have been 0", counters.getNoOfCollectedFiles(), 0);
+        assertEquals("failedFtpAttempts should have been 1", counters.getNoOfFailedFtpAttempts(), 1);
     }
 
     @Test
@@ -249,5 +262,8 @@ public class FileCollectorTest {
             .verifyComplete();
 
         verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+
+        assertEquals("collectedFiles should have been 1", counters.getNoOfCollectedFiles(), 1);
+        assertEquals("failedFtpAttempts should have been 1", counters.getNoOfFailedFtpAttempts(), 1);
     }
 }
index edbf588..5a8d962 100644 (file)
@@ -37,7 +37,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.time.Instant;
@@ -45,7 +44,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -67,7 +65,6 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
 import org.slf4j.MDC;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -247,13 +244,16 @@ public class ScheduledTasksTest {
 
         testedObject.executeDatafileMainTask();
 
-        await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions()));
+        await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
+            testedObject.getCurrentNumberOfSubscriptions()));
 
         assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
 
         verify(appConfig).getDmaapConsumerConfiguration();
         verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
         verifyNoMoreInteractions(appConfig);
+
+        assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
     }
 
     @Test
@@ -290,7 +290,8 @@ public class ScheduledTasksTest {
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
         testedObject.executeDatafileMainTask();
 
-        await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions()));
+        await().untilAsserted(() -> assertEquals("currentNumberOfSubscriptions should have been 0", 0,
+            testedObject.getCurrentNumberOfSubscriptions()));
 
         assertTrue("Error missing in log", logAppender.list.toString().contains(
             "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"));
@@ -339,12 +340,18 @@ public class ScheduledTasksTest {
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
         assertEquals(0, testedObject.getThreadPoolQueueSize());
+
         verify(consumerMock, times(1)).getMessageRouterResponse();
+        verifyNoMoreInteractions(consumerMock);
+
         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
+        verifyNoMoreInteractions(fileCollectorMock);
+
         verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
         verifyNoMoreInteractions(dataRouterMock);
-        verifyNoMoreInteractions(fileCollectorMock);
-        verifyNoMoreInteractions(consumerMock);
+
+        assertEquals("totalReceivedEvents should have been 200", 200,
+            testedObject.getCounters().getTotalReceivedEvents());
     }
 
     @Test
@@ -375,12 +382,18 @@ public class ScheduledTasksTest {
             .verify(); //
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
+
         verify(consumerMock, times(1)).getMessageRouterResponse();
+        verifyNoMoreInteractions(consumerMock);
+
         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
+        verifyNoMoreInteractions(fileCollectorMock);
+
         verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
         verifyNoMoreInteractions(dataRouterMock);
-        verifyNoMoreInteractions(fileCollectorMock);
-        verifyNoMoreInteractions(consumerMock);
+
+        assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
+        assertEquals("failedFtp should have been 1", 1, testedObject.getCounters().getNoOfFailedFtp());
     }
 
     @Test
@@ -412,12 +425,18 @@ public class ScheduledTasksTest {
         assertTrue("Error missing in log", logAppender.list.toString().contains("[ERROR] File publishing failed: "));
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
+
         verify(consumerMock, times(1)).getMessageRouterResponse();
+        verifyNoMoreInteractions(consumerMock);
+
         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
+        verifyNoMoreInteractions(fileCollectorMock);
+
         verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
         verifyNoMoreInteractions(dataRouterMock);
-        verifyNoMoreInteractions(fileCollectorMock);
-        verifyNoMoreInteractions(consumerMock);
+
+        assertEquals("totalReceivedEvents should have been 2", 2, testedObject.getCounters().getTotalReceivedEvents());
+        assertEquals("noOfFailedPublish should have been 1", 1, testedObject.getCounters().getNoOfFailedPublish());
     }
 
     @Test
@@ -444,13 +463,19 @@ public class ScheduledTasksTest {
             .verify(); //
 
         assertEquals(0, testedObject.getCurrentNumberOfTasks());
+
         verify(consumerMock, times(1)).getMessageRouterResponse();
+        verifyNoMoreInteractions(consumerMock);
+
         verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
-        verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
-        verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
-        verifyNoMoreInteractions(dataRouterMock);
         verifyNoMoreInteractions(fileCollectorMock);
-        verifyNoMoreInteractions(consumerMock);
+
+        verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
         verifyNoMoreInteractions(dataRouterMock);
+
+        verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
+        verifyNoMoreInteractions(publishedCheckerMock);
+
+        assertEquals("totalReceivedEvents should have been 1", 1, testedObject.getCounters().getTotalReceivedEvents());
     }
 }
index 604bba3..68f3582 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.utils;
 
+import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.Logger;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-
 import org.slf4j.LoggerFactory;
 
 public class LoggingUtils {
@@ -32,7 +32,20 @@ public class LoggingUtils {
      * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
      */
     public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass) {
+        return getLogListAppender(logClass, false);
+    }
+
+    /**
+     * Returns a ListAppender that contains all logging events. Call this method at the very beginning of the test
+     *
+     * @param logClass class whose appender is wanted.
+     * @param allLevels true if all log levels should be activated.
+     */
+    public static ListAppender<ILoggingEvent> getLogListAppender(Class<?> logClass, boolean allLevels) {
         Logger logger = (Logger) LoggerFactory.getLogger(logClass);
+        if (allLevels) {
+            logger.setLevel(Level.ALL);
+        }
         ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
         listAppender.start();
         logger.addAppender(listAppender);