Cbs Client integration 23/89223/14
authorYongchaoWu <yongchao.wu@est.tech>
Fri, 12 Jul 2019 09:01:01 +0000 (09:01 +0000)
committerYongchaoWu <yongchao.wu@est.tech>
Fri, 12 Jul 2019 09:01:01 +0000 (09:01 +0000)
Cbs Client is integrated to read configurations from consul

Issue-ID: DCAEGEN2-1595
Change-Id: Idb0ebd34eba077f9c1cb584abab4d8722b56f6c5
Signed-off-by: YongchaoWu <yongchao.wu@est.tech>
28 files changed:
datafile-app-server/config/datafile_endpoints.json
datafile-app-server/dpo/blueprints/k8s-datafile.yaml
datafile-app-server/pom.xml
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/LoggingUtils.java
datafile-app-server/src/test/resources/datafile_endpoints_test.json
datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json
pom.xml

index cd1b502..e1e857d 100644 (file)
@@ -1,33 +1,36 @@
 {
-   "//description":"This file is only used for testing purposes",
-   "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
-   "dmaap.ftpesConfig.keyPassword":"secret",
-   "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
-   "dmaap.ftpesConfig.trustedCaPassword":"secret",
-   "dmaap.security.trustStorePath":"change it",
-   "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
-   "dmaap.security.keyStorePath":"keyStorePath",
-   "dmaap.security.keyStorePasswordPath":"change it",
-   "dmaap.security.enableDmaapCertAuth":"false",
-   "dmaap.dmaapProducerConfiguration" : {
-         "changeIdentifier":"PM_MEAS_FILES",
-         "feedName":"feed00"
-    },  
-    "streams_subscribes":{
-      "dmaap_subscriber":{
-         "dmmap_info":{
-            "topic_url":"http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
-         },
-         "type":"message_router"
+  "config": {
+    "//description": "This file is only used for testing purposes",
+    "dmaap.ftpesConfig.keyCert": "/config/dfc.jks",
+    "dmaap.ftpesConfig.keyPassword": "secret",
+    "dmaap.ftpesConfig.trustedCa": "config/ftp.jks",
+    "dmaap.ftpesConfig.trustedCaPassword": "secret",
+    "dmaap.security.trustStorePath": "change it",
+    "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
+    "dmaap.security.keyStorePath": "keyStorePath",
+    "dmaap.security.keyStorePasswordPath": "change it",
+    "dmaap.security.enableDmaapCertAuth": "false",
+    "streams_publishes": {
+      "PM_MEAS_FILES": {
+        "type": "data_router",
+        "dmaap_info": {
+          "username": "CYE9fl40",
+          "location": "loc00",
+          "log_url": "https://dmaap-dr-prov/feedlog/4",
+          "publisher_id": "4.307dw",
+          "password": "izBJD8nLjawq0HMG",
+          "publish_url": "https://dmaap-dr-prov/publish/4"
+        }
       }
-   },
-    "feed00":{
-      "username":"user",
-      "log_url":"https://localhost:3907/feedlog/1",
-      "publish_url":"https://localhost:3907/publish/1",
-      "location":"loc00",
-      "password":"dradmin",
-      "publisher_id":"972.360gm"
-   }
+    },
+    "streams_subscribes": {
+      "dmaap_subscriber": {
+        "dmaap_info": {
+          "topic_url": "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+        },
+        "type": "message_router"
+      }
+    }
+  }
 }
 
index cae9530..2de0a24 100644 (file)
@@ -56,10 +56,6 @@ inputs:
   feedDescription:
     type: string
     default: "Feed for Bulk PM files"
-  changeIdentifier:
-    type: string
-    description: type of different files
-    default: "PM_MEAS_FILES"
 
 node_templates:
   bulk_pm_feed:
@@ -95,14 +91,15 @@ node_templates:
         dmaap.security.keyStorePath: "/opt/app/datafile/etc/cert/key.p12"
         dmaap.security.keyStorePasswordPath: "/opt/app/datafile/etc/cert/key.pass"
         dmaap.security.enableDmaapCertAuth: { get_input: secureEnableCert }
-        dmaap.dmaapProducerConfiguration:
-          changeIdentifier: {get_input: changeIdentifier}
-          feedName: {get_input: feedName}
+        streams_publishes:
+          PM_MEAS_FILES:
+            dmaap_info: <<bulk_pm_feed>>
+            type: data_router
         streams_subscribes:
           dmaap_subscriber:
             type:
               "message_router"
-            dmmap_info:
+            dmaap_info:
               topic_url:
                 { concat: [{ get_input: dmaap_mr_protocol },"://",{ get_input: dmaap_mr_host },
                            ":",{ get_input: dmaap_mr_port },"/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"]}
index 2f7d2af..50557b3 100644 (file)
@@ -34,6 +34,8 @@
   <properties>
     <docker.image.name>onap/${project.groupId}.${project.artifactId}</docker.image.name>
     <maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
+    <spring.version>5.1.4.RELEASE</spring.version>
+    <spring-boot.version>2.1.2.RELEASE</spring-boot.version>
   </properties>
 
   <dependencies>
       <groupId>io.springfox</groupId>
       <artifactId>springfox-swagger2</artifactId>
     </dependency>
+    <dependency>
+      <groupId>javax.validation</groupId>
+      <artifactId>validation-api</artifactId>
+      <version>2.0.1.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot</artifactId>
+      <version>${spring-boot.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-web</artifactId>
+      <version>${spring.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-webmvc</artifactId>
+      <version>${spring.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.spotify</groupId>
+      <artifactId>docker-client</artifactId>
+      <version>8.7.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tomcat.embed</groupId>
+      <artifactId>tomcat-embed-core</artifactId>
+      <version>9.0.14</version>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-webflux</artifactId>
+      <version>${spring.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-autoconfigure</artifactId>
+      <version>${spring-boot.version}</version>
+    </dependency>
     <dependency>
       <groupId>io.springfox</groupId>
       <artifactId>springfox-swagger-ui</artifactId>
index e9d8464..6e9f770 100644 (file)
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
 import com.google.gson.TypeAdapterFactory;
+
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -31,21 +32,26 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Properties;
 import java.util.ServiceLoader;
+
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
+
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.stereotype.Component;
+
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -65,22 +71,15 @@ public class AppConfig {
     private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
 
     private ConsumerConfiguration dmaapConsumerConfiguration;
-    Map<String, PublisherConfiguration> publishingConfigurations;
+    private Map<String, PublisherConfiguration> publishingConfigurations;
     private FtpesConfig ftpesConfiguration;
-    private CloudConfigurationProvider cloudConfigurationProvider;
     @Value("#{systemEnvironment}")
     Properties systemEnvironment;
-    Disposable refreshConfigTask = null;
+    private Disposable refreshConfigTask = null;
 
     @NotEmpty
     private String filepath;
 
-    @Autowired
-    public synchronized void setCloudConfigurationProvider(
-        CloudConfigurationProvider reactiveCloudConfigurationProvider) {
-        this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
-    }
-
     public synchronized void setFilepath(String filepath) {
         this.filepath = filepath;
     }
@@ -93,13 +92,25 @@ public class AppConfig {
         Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
         loadConfigurationFromFile();
 
-        refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
-            .flatMap(count -> createRefreshConfigurationTask(count, context))
+        refreshConfigTask = createRefreshTask(context) //
             .subscribe(e -> logger.info("Refreshed configuration data"),
                 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
                 () -> logger.error("Configuration refresh terminated"));
     }
 
+    Flux<AppConfig> createRefreshTask(Map<String, String> context) {
+        return getEnvironment(systemEnvironment, context).flatMap(this::createCbsClient)
+            .flatMapMany(this::periodicConfigurationUpdates).map(this::parseCloudConfig)
+            .onErrorResume(this::onErrorResume);
+    }
+
+    private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+        final Duration initialDelay = Duration.ZERO;
+        final Duration refreshPeriod = Duration.ofMinutes(1);
+        final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+        return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+    }
+
     /**
      * Stops the refreshing of the configuration.
      */
@@ -152,55 +163,31 @@ public class AppConfig {
         return ftpesConfiguration;
     }
 
-    Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
-        return Flux.just(counter) //
-            .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
-            .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
-            .flatMap(this::fetchConfiguration);
-    }
-
-    Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
-        return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
-            .onErrorResume(AppConfig::onErrorResume);
-    }
-
-    private static <R> Mono<R> onErrorResume(Throwable trowable) {
+    private <R> Mono<R> onErrorResume(Throwable trowable) {
         logger.error("Could not refresh application configuration {}", trowable.toString());
         return Mono.empty();
     }
 
-    private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
-        Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
-            .onErrorResume(AppConfig::onErrorResume);
-
-        // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
-        // other ones does not work
-        EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
-            .consulHost(env.consulHost()) //
-            .consulPort(env.consulPort()) //
-            .cbsName(env.cbsName()) //
-            .appName(env.appName() + ":dmaap") //
-            .build(); //
-        Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
-            .onErrorResume(t -> Mono.just(new JsonObject()));
+    Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) {
+        return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context);
+    }
 
-        return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
-            .onErrorResume(AppConfig::onErrorResume);
+    Mono<CbsClient> createCbsClient(EnvProperties env) {
+        return CbsClientFactory.createCbsClient(env);
     }
 
     /**
      * Parse configuration.
      *
-     * @param serviceConfigRootObject the DFC service's configuration
-     * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken from the
-     *        serviceConfigRootObject
+     * @param jsonObject the DFC service's configuration
      * @return this which is updated if successful
      */
-    private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+    private AppConfig parseCloudConfig(JsonObject jsonObject) {
         try {
-            CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+            CloudConfigParser parser = new CloudConfigParser(jsonObject);
             setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
                 parser.getFtpesConfig());
+
         } catch (DatafileTaskException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
         }
@@ -220,20 +207,21 @@ public class AppConfig {
             if (rootObject == null) {
                 throw new JsonSyntaxException("Root is not a json object");
             }
-            parseCloudConfig(rootObject, rootObject);
+            parseCloudConfig(rootObject);
         } catch (JsonSyntaxException | IOException e) {
             logger.warn("Local configuration file not loaded: {}", filepath, e);
         }
     }
 
     private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
-        Map<String, PublisherConfiguration> publisherConfigurations, FtpesConfig ftpesConfig) {
-        if (consumerConfiguration == null || publisherConfigurations == null || ftpesConfig == null) {
-            logger.error("Problem with consumerConfiguration: {}, publisherConfigurations: {}, ftpesConfig: {}",
-                consumerConfiguration, publisherConfigurations, ftpesConfig);
+        Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+        if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+            logger.error(
+                "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+                consumerConfiguration, publisherConfiguration, ftpesConfig);
         } else {
             this.dmaapConsumerConfiguration = consumerConfiguration;
-            this.publishingConfigurations = publisherConfigurations;
+            this.publishingConfigurations = publisherConfiguration;
             this.ftpesConfiguration = ftpesConfig;
         }
     }
index 0242bef..d9a9b76 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
 /**
@@ -40,13 +41,13 @@ public class CloudConfigParser {
     private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
     private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
     private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
+    private static final String CONFIG = "config";
+
+    private final JsonObject jsonObject;
 
-    private final JsonObject serviceConfigurationRoot;
-    private final JsonObject dmaapConfigurationRoot;
+    public CloudConfigParser(JsonObject jsonObject) {
+        this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
 
-    public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) {
-        this.serviceConfigurationRoot = serviceConfigurationRoot;
-        this.dmaapConfigurationRoot = dmaapConfigurationRoot;
     }
 
     /**
@@ -57,33 +58,34 @@ public class CloudConfigParser {
      * @throws DatafileTaskException if a member of the configuration is missing.
      */
     public Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
-        Iterator<JsonElement> producerCfgs =
-            toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
+        JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject();
+        Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator();
 
         Map<String, PublisherConfiguration> result = new HashMap<>();
 
-        while (producerCfgs.hasNext()) {
-            JsonObject producerCfg = producerCfgs.next().getAsJsonObject();
-            String feedName = getAsString(producerCfg, "feedName");
-            JsonObject feedConfig = getFeedConfig(feedName);
+        while (changeIdentifierList.hasNext()) {
+
+            String changeIdentifier = changeIdentifierList.next();
+            JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier);
+            JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject();
 
             PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
                 .publishUrl(getAsString(feedConfig, "publish_url")) //
                 .passWord(getAsString(feedConfig, "password")) //
                 .userName(getAsString(feedConfig, "username")) //
-                .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) //
-                .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
-                .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) //
-                .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
-                .enableDmaapCertAuth(
-                    get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
-                .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) //
+                .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
+                .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
+                .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) //
+                .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
+                .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+                .changeIdentifier(changeIdentifier) //
                 .logUrl(getAsString(feedConfig, "log_url")) //
                 .build();
 
             result.put(cfg.changeIdentifier(), cfg);
         }
         return result;
+
     }
 
     /**
@@ -93,21 +95,21 @@ public class CloudConfigParser {
      * @throws DatafileTaskException if a member of the configuration is missing.
      */
     public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
-        JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
+        JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject();
         Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
         if (topics.size() != 1) {
-            throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics);
+            throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics);
         }
         JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
-        JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject();
+        JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
         String topicUrl = getAsString(dmaapInfo, "topic_url");
 
         return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
-            .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH))
-            .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
-            .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH))
-            .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
-            .enableDmaapCertAuth(get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+            .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))
+            .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
+            .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))
+            .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
+            .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
             .build();
     }
 
@@ -119,10 +121,10 @@ public class CloudConfigParser {
      */
     public FtpesConfig getFtpesConfig() throws DatafileTaskException {
         return new ImmutableFtpesConfig.Builder() //
-            .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
-            .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword"))
-            .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa"))
-            .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) //
+            .keyCert(getAsString(jsonObject, "dmaap.ftpesConfig.keyCert"))
+            .keyPassword(getAsString(jsonObject, "dmaap.ftpesConfig.keyPassword"))
+            .trustedCa(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCa"))
+            .trustedCaPassword(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCaPassword")) //
             .build();
     }
 
@@ -138,20 +140,8 @@ public class CloudConfigParser {
         return get(obj, memberName).getAsString();
     }
 
-    private JsonObject getFeedConfig(String feedName) throws DatafileTaskException {
-        JsonElement elem = dmaapConfigurationRoot.get(feedName);
-        if (elem == null) {
-            elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under serviceConfigurationRoot
-        }
-        return elem.getAsJsonObject();
+    private static JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
+        return get(obj, memberName).getAsJsonObject();
     }
 
-    private static JsonArray toArray(JsonElement obj) {
-        if (obj.isJsonArray()) {
-            return obj.getAsJsonArray();
-        }
-        JsonArray arr = new JsonArray();
-        arr.add(obj);
-        return arr;
-    }
 }
index e62a11e..4db7963 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
 
 import java.net.MalformedURLException;
 import java.net.URL;
+
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -62,6 +63,7 @@ public abstract class ConsumerConfiguration {
             DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
 
             return new ImmutableDmaapConsumerConfiguration.Builder() //
+                .endpointUrl(topicUrl()) //
                 .dmaapContentType("application/json") //
                 .dmaapPortNumber(url.getPort()) //
                 .dmaapHostName(url.getHost()) //
index f3c915b..ad5f648 100644 (file)
@@ -21,8 +21,8 @@ import java.util.Optional;
 import java.util.Properties;
 
 import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
index 7a84524..d7451bd 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
 
 import java.net.MalformedURLException;
 import java.net.URL;
+
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
@@ -60,6 +61,7 @@ public interface PublisherConfiguration {
         String urlPath = url.getPath();
 
         return new ImmutableDmaapPublisherConfiguration.Builder() //
+            .endpointUrl(publishUrl()) //
             .dmaapContentType("application/octet-stream") //
             .dmaapPortNumber(url.getPort()) //
             .dmaapHostName(url.getHost()) //
index bdedba4..da8361f 100644 (file)
@@ -22,8 +22,10 @@ import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import com.jcraft.jsch.SftpException;
+
 import java.nio.file.Path;
 import java.util.Optional;
+
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.slf4j.Logger;
index abed645..eed0f0b 100644 (file)
@@ -95,8 +95,9 @@ public class JsonMessageParser {
      * @param rawMessage the Json message to parse.
      * @return a <code>Flux</code> containing messages.
      */
-    public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
-        return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
+
+    public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) {
+        return rawMessage.flatMapMany(this::createMessageData);
     }
 
     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -126,10 +127,6 @@ public class JsonMessageParser {
             : getMessagesFromJsonArray(jsonElement);
     }
 
-    private static Mono<JsonElement> getJsonParserMessage(String message) {
-        return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
-    }
-
     private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
         return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
             : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
index 081c7f3..9c33484 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
 import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.web.reactive.function.client.WebClient;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -41,19 +44,20 @@ import reactor.core.publisher.Mono;
  */
 public class DMaaPMessageConsumer {
     private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class);
-
+    private final AppConfig datafileAppConfig;
     private final JsonMessageParser jsonMessageParser;
-    private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+    private final ConsumerReactiveHttpClientFactory httpClientFactory;
 
-    public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
-        this.jsonMessageParser = new JsonMessageParser();
-        this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
+    public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+        this(datafileAppConfig, new JsonMessageParser(),
+            new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
     }
 
-    protected DMaaPMessageConsumer(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
-        JsonMessageParser messageParser) {
-        this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
-        this.jsonMessageParser = messageParser;
+    protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser,
+        ConsumerReactiveHttpClientFactory httpClientFactory) {
+        this.datafileAppConfig = datafileAppConfig;
+        this.jsonMessageParser = jsonMessageParser;
+        this.httpClientFactory = httpClientFactory;
     }
 
     /**
@@ -63,19 +67,23 @@ public class DMaaPMessageConsumer {
      */
     public Flux<FileReadyMessage> getMessageRouterResponse() {
         logger.trace("getMessageRouterResponse called");
-        return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
+        try {
+            DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient = createHttpClient();
+            return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())));
+        } catch (DatafileTaskException e) {
+            logger.warn("Unable to get response from message router", e);
+            return Flux.empty();
+        }
     }
 
-    private Flux<FileReadyMessage> consume(Mono<String> message) {
+    private Flux<FileReadyMessage> consume(Mono<JsonElement> message) {
         logger.trace("consume called with arg {}", message);
         return jsonMessageParser.getMessagesFromJson(message);
     }
 
-    private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
-        throws DatafileTaskException {
-        DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
-        WebClient client = new DmaapWebClient().fromConfiguration(config).build();
-        return new DMaaPConsumerReactiveHttpClient(config, client);
+    public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException {
+
+        return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap());
     }
 
 }
index bdec719..cfaf175 100644 (file)
@@ -22,11 +22,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+
 import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.nio.file.Path;
 import java.time.Duration;
+
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.ContentType;
index 1ce64e4..bccbb5f 100644 (file)
@@ -21,6 +21,7 @@ import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
index 26353e3..de45da3 100644 (file)
@@ -22,6 +22,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
@@ -156,7 +158,7 @@ public class ScheduledTasks {
         return this.counters;
     }
 
-    protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
+    protected DMaaPMessageConsumer createConsumerTask() {
         return new DMaaPMessageConsumer(this.applicationConfiguration);
     }
 
index d9ca787..f661dd0 100644 (file)
@@ -19,20 +19,17 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import com.google.common.base.Charsets;
 import com.google.common.io.Resources;
 import com.google.gson.JsonElement;
@@ -40,29 +37,28 @@ import com.google.gson.JsonIOException;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonSyntaxException;
+
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-import reactor.core.Disposable;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -73,12 +69,14 @@ import reactor.test.StepVerifier;
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 4/9/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
-class AppConfigTest {
+public class AppConfigTest {
 
-    private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+    public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
 
-    private static final DmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+    public static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
         new ImmutableDmaapConsumerConfiguration.Builder() //
+            .endpointUrl(
+                "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
             .timeoutMs(-1) //
             .dmaapHostName("message-router.onap.svc.cluster.local") //
             .dmaapUserName("admin") //
@@ -97,7 +95,7 @@ class AppConfigTest {
             .enableDmaapCertAuth(true) //
             .build();
 
-    private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
+    public static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
         .topicUrl(
             "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
         .trustStorePath("trustStorePath") //
@@ -120,7 +118,7 @@ class AppConfigTest {
             .passWord("password") //
             .build();
 
-    private static final FtpesConfig CORRECT_FTPES_CONFIGURATION = //
+    private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
         new ImmutableFtpesConfig.Builder() //
             .keyCert("/config/dfc.jks") //
             .keyPassword("secret") //
@@ -128,9 +126,9 @@ class AppConfigTest {
             .trustedCaPassword("secret") //
             .build();
 
-    private static final DmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+    private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
         new ImmutableDmaapPublisherConfiguration.Builder() //
-            .dmaapTopicName("/publish/1") //
+            .endpointUrl("https://message-router.onap.svc.cluster.local:3907/publish/1").dmaapTopicName("/publish/1") //
             .dmaapUserPassword("password") //
             .dmaapPortNumber(3907) //
             .dmaapProtocol("https") //
@@ -154,14 +152,14 @@ class AppConfigTest {
     }
 
     private AppConfig appConfigUnderTest;
-    private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class);
     private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
+    CbsClient cbsClient = mock(CbsClient.class);
 
     @BeforeEach
-    public void setUp() {
+    void setUp() {
         appConfigUnderTest = spy(AppConfig.class);
-        appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider);
         appConfigUnderTest.systemEnvironment = new Properties();
+
     }
 
     @Test
@@ -212,19 +210,14 @@ class AppConfigTest {
         // Given
         appConfigUnderTest.setFilepath("/temp.json");
 
-        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
-
         // When
         appConfigUnderTest.loadConfigurationFromFile();
 
         // Then
-        assertTrue("Error message missing in log.",
-            logAppender.list.toString().contains("[WARN] Local configuration file not loaded: /temp.json"));
-        logAppender.stop();
-
         Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
         assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
             .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES");
+
         Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
     }
 
@@ -264,32 +257,31 @@ class AppConfigTest {
     @Test
     public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
-
-        Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+        Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
 
         StepVerifier //
             .create(task) //
             .expectSubscription() //
-            .expectNextCount(0) //
-            .verifyComplete();
+            .verifyComplete(); //
 
         assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
     }
 
     @Test
     public void whenPeriodicConfigRefreshNoConsul() {
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+        EnvProperties props = properties();
+        doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
 
-        doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
-        Mono<JsonObject> err = Mono.error(new IOException());
-        doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
+        Flux<JsonObject> err = Flux.error(new IOException());
+        doReturn(err).when(cbsClient).updates(any(), any(), any());
 
-        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
-        Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+        Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
 
         StepVerifier //
             .create(task) //
             .expectSubscription() //
-            .expectNextCount(0) //
             .verifyComplete();
 
         assertTrue(
@@ -298,13 +290,14 @@ class AppConfigTest {
 
     @Test
     public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
-        doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
-
-        Mono<JsonObject> json = Mono.just(getJsonRootObject());
+        EnvProperties props = properties();
+        doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
+        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
 
-        doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+        Flux<JsonObject> json = Flux.just(getJsonRootObject());
+        doReturn(json).when(cbsClient).updates(any(), any(), any());
 
-        Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+        Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
 
         StepVerifier //
             .create(task) //
@@ -317,14 +310,18 @@ class AppConfigTest {
 
     @Test
     public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
-        doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+        EnvProperties props = properties();
+        doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any());
 
-        Mono<JsonObject> json = Mono.just(getJsonRootObject());
-        Mono<JsonObject> err = Mono.error(new IOException()); // no config entry created by the dmaap plugin
+        doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props);
 
-        doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+        Flux<JsonObject> json = Flux.just(getJsonRootObject());
+        Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the
+        // dmaap plugin
 
-        Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+        doReturn(json, err).when(cbsClient).updates(any(), any(), any());
+
+        Flux<AppConfig> task = appConfigUnderTest.createRefreshTask(context);
 
         StepVerifier //
             .create(task) //
@@ -335,37 +332,6 @@ class AppConfigTest {
         Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
     }
 
-    @Test
-    public void whenStopSuccess() {
-        Disposable disposableMock = mock(Disposable.class);
-        appConfigUnderTest.refreshConfigTask = disposableMock;
-
-        appConfigUnderTest.stop();
-
-        verify(disposableMock).dispose();
-        verifyNoMoreInteractions(disposableMock);
-        assertNull(appConfigUnderTest.refreshConfigTask);
-    }
-
-    @Test
-    public void whenNoPublisherConfigurationThrowException() throws DatafileTaskException {
-        appConfigUnderTest.publishingConfigurations = new HashMap<>();
-
-        DatafileTaskException exception = assertThrows(DatafileTaskException.class,
-            () -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
-        assertEquals("Cannot find getPublishingConfiguration for changeIdentifier: " + CHANGE_IDENTIFIER,
-            exception.getMessage());
-    }
-
-    @Test
-    public void whenFeedIsConfiguredReturnTrue() {
-        HashMap<String, PublisherConfiguration> publishingConfigs = new HashMap<>();
-        publishingConfigs.put(CHANGE_IDENTIFIER, null);
-        appConfigUnderTest.publishingConfigurations = publishingConfigs;
-
-        assertTrue(appConfigUnderTest.isFeedConfigured(CHANGE_IDENTIFIER));
-    }
-
     private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
         JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
         return rootObject;
index b630bd0..558eaf0 100644 (file)
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
index 9b8197f..55c796a 100644 (file)
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.doReturn;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
index 5330a7f..1c58650 100644 (file)
@@ -33,9 +33,11 @@ import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 import com.jcraft.jsch.SftpException;
+
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Optional;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
index 1e54d29..499b260 100644 (file)
@@ -25,8 +25,10 @@ import static org.mockito.Mockito.when;
 import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import java.net.URI;
 import java.net.URISyntaxException;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
index 2e3245a..cd18bfa 100644 (file)
@@ -70,6 +70,7 @@ class JsonMessageParserTest {
     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
     private static final String FILE_FORMAT_VERSION = "V10";
     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+    private static final String INCORRECT_CHANGE_IDENTIFIER = "INCORRECT_PM_MEAS_FILES";
     private static final String CHANGE_TYPE = "FileReady";
     private static final String INCORRECT_CHANGE_TYPE = "IncorrectFileReady";
     private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@@ -116,15 +117,14 @@ class JsonMessageParserTest {
             .files(files) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNext(expectedMessage).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNext(expectedMessage).verifyComplete();
     }
 
     @Test
@@ -173,10 +173,11 @@ class JsonMessageParserTest {
         String messageString = "[" + parsedString + "," + parsedString + "]";
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement1 = new JsonParser().parse(messageString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement1)))
             .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
     }
 
@@ -196,7 +197,6 @@ class JsonMessageParserTest {
             .addAdditionalField(additionalField) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -204,8 +204,8 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNextCount(0).verifyComplete();
 
         assertTrue(logAppender.list.toString()
             .contains("[ERROR] VES event parsing. File information wrong. " + "Missing location."));
@@ -229,7 +229,6 @@ class JsonMessageParserTest {
             .addAdditionalField(additionalField) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -237,8 +236,8 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
             logAppender.list.toString()
@@ -293,9 +292,10 @@ class JsonMessageParserTest {
         String parsedString = message.getParsed();
         String messageString = "[{\"event\":{}}," + parsedString + "]";
         JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
+        JsonElement jsonElement = new JsonParser().parse(messageString);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNext(expectedMessage).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNext(expectedMessage).verifyComplete();
     }
 
     @Test
@@ -314,7 +314,6 @@ class JsonMessageParserTest {
             .addAdditionalField(additionalField) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -322,8 +321,8 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectComplete().verify();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectComplete().verify();
 
         assertTrue("Error missing in log",
             logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
@@ -346,7 +345,6 @@ class JsonMessageParserTest {
             .addAdditionalField(additionalField) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -354,8 +352,8 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
             logAppender.list.toString()
@@ -373,7 +371,6 @@ class JsonMessageParserTest {
             .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -381,8 +378,8 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
             logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
@@ -405,7 +402,6 @@ class JsonMessageParserTest {
             .addAdditionalField(additionalField) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -413,8 +409,8 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
             logAppender.list.toString()
@@ -439,7 +435,6 @@ class JsonMessageParserTest {
             .addAdditionalField(additionalField) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -447,8 +442,8 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNextCount(0).verifyComplete();
 
         assertTrue("Error missing in log",
             logAppender.list.toString()
@@ -506,15 +501,14 @@ class JsonMessageParserTest {
             .files(files) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
             .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNext(expectedMessage).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNext(expectedMessage).verifyComplete();
     }
 
     @Test
@@ -523,7 +517,6 @@ class JsonMessageParserTest {
             .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
             .build();
 
-        String incorrectMessageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -531,8 +524,8 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString)))
-            .expectSubscription().expectComplete().verify();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectComplete().verify();
 
         assertTrue("Error missing in log",
             logAppender.list.toString()
@@ -550,7 +543,7 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
             .expectComplete().verify();
 
         assertTrue("Error missing in log",
@@ -573,7 +566,6 @@ class JsonMessageParserTest {
             .addAdditionalField(additionalField) //
             .build();
 
-        String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
@@ -581,12 +573,38 @@ class JsonMessageParserTest {
             .getJsonObjectFromAnArray(jsonElement);
 
         ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class);
-        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
-            .expectSubscription().expectNextCount(0).expectComplete().verify();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectNextCount(0).expectComplete().verify();
 
         assertTrue("Error missing in log",
             logAppender.list.toString()
                 .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING + " Change type is wrong: "
                     + INCORRECT_CHANGE_TYPE + " Expected: FileReady Message: " + message.getParsed()));
     }
+
+    @Test
+    void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+            .name(PM_FILE_NAME) //
+            .location(LOCATION) //
+            .compression(GZIP_COMPRESSION) //
+            .fileFormatVersion(FILE_FORMAT_VERSION) //
+            .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+            .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+            .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) //
+            .changeType(CHANGE_TYPE) //
+            .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+            .addAdditionalField(additionalField) //
+            .build();
+
+        String parsedString = message.getParsed();
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+            .getJsonObjectFromAnArray(jsonElement);
+
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription()
+            .expectComplete().verify();
+    }
 }
index 1bea290..a4319d3 100644 (file)
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
+import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
 
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Optional;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
@@ -48,6 +57,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
 
 import reactor.core.publisher.Flux;
@@ -76,25 +86,36 @@ public class DMaaPMessageConsumerTest {
     private static final String GZIP_COMPRESSION = "gzip";
     private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
     private static final String FILE_FORMAT_VERSION = "V10";
-    private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>();
+    private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>();
     private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
 
     private DMaaPConsumerReactiveHttpClient httpClientMock;
 
     private DMaaPMessageConsumer messageConsumer;
     private static String ftpesMessageString;
+    private static JsonElement ftpesMessageJson;
     private static FileData ftpesFileData;
     private static FileReadyMessage expectedFtpesMessage;
 
     private static String sftpMessageString;
+    private static JsonElement sftpMessageJson;
     private static FileData sftpFileData;
     private static FileReadyMessage expectedSftpMessage;
 
+    private static AppConfig appConfig;
+    private static ConsumerConfiguration dmaapConsumerConfiguration;
+
     /**
      * Sets up data for the test.
      */
     @BeforeAll
     public static void setUp() {
+
+        appConfig = mock(AppConfig.class);
+        dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG;
+
+        JsonParser jsonParser = new JsonParser();
+
         AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
             .location(FTPES_LOCATION) //
             .compression(GZIP_COMPRESSION) //
@@ -111,6 +132,8 @@ public class DMaaPMessageConsumerTest {
             .build();
 
         ftpesMessageString = ftpesJsonMessage.toString();
+        ftpesMessageJson = jsonParser.parse(ftpesMessageString);
+
         MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
             .productName(PRODUCT_NAME) //
             .vendorName(VENDOR_NAME) //
@@ -151,6 +174,7 @@ public class DMaaPMessageConsumerTest {
             .addAdditionalField(sftpAdditionalField) //
             .build();
         sftpMessageString = sftpJsonMessage.toString();
+        sftpMessageJson = jsonParser.parse(sftpMessageString);
         sftpFileData = ImmutableFileData.builder() //
             .name(PM_FILE_NAME) //
             .location(SFTP_LOCATION) //
@@ -188,54 +212,62 @@ public class DMaaPMessageConsumerTest {
 
     @Test
     public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
-        prepareMocksForDmaapConsumer("", null);
+        prepareMocksForDmaapConsumer(Optional.empty(), null);
 
         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
             .expectSubscription() //
             .expectError(DatafileTaskException.class) //
             .verify();
 
-        verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+        verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
     }
 
     @Test
     public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
-        prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
+        prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage);
 
         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
             .expectNext(expectedFtpesMessage) //
             .verifyComplete();
 
-        verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+        verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
         verifyNoMoreInteractions(httpClientMock);
     }
 
     @Test
     public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
-        prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
+        prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage);
 
         StepVerifier.create(messageConsumer.getMessageRouterResponse()) //
             .expectNext(expectedSftpMessage) //
             .verifyComplete();
 
-        verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+        verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty());
         verifyNoMoreInteractions(httpClientMock);
     }
 
-    private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
-        Mono<String> messageAsMono = Mono.just(message);
+    private void prepareMocksForDmaapConsumer(Optional<JsonElement> message,
+        FileReadyMessage fileReadyMessageAfterConsume) {
+        Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty();
         JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
         httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
-        when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
+        when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono);
+        when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
+        ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
+        try {
+            doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap());
+        } catch (DatafileTaskException e) {
+            e.printStackTrace();
+        }
 
-        if (!message.isEmpty()) {
-            when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
-                .thenReturn(Flux.just(fileReadyMessageAfterConsume));
+        if (message.isPresent()) {
+            when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume));
         } else {
-            when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+            when(jsonMessageParserMock.getMessagesFromJson(any()))
                 .thenReturn(Flux.error(new DatafileTaskException("problemas")));
         }
 
-        messageConsumer = spy(new DMaaPMessageConsumer(httpClientMock, jsonMessageParserMock));
+        messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory));
     }
+
 }
index ddc279c..fb36917 100644 (file)
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import java.io.File;
 import java.net.URI;
 import java.nio.file.Path;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.StatusLine;
index 93f2007..1ab97d4 100644 (file)
@@ -32,6 +32,7 @@ import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
index 5a8d962..a0096b7 100644 (file)
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.time.Instant;
@@ -44,6 +45,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -65,6 +67,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
 import org.slf4j.MDC;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
index 68f3582..cfcb7bf 100644 (file)
@@ -24,6 +24,7 @@ import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.Logger;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
+
 import org.slf4j.LoggerFactory;
 
 public class LoggingUtils {
index 4d4d00a..0157c7d 100644 (file)
@@ -1,31 +1,35 @@
 {
-   "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
-   "dmaap.ftpesConfig.keyPassword":"secret",
-   "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
-   "dmaap.ftpesConfig.trustedCaPassword":"secret",
-   "dmaap.security.trustStorePath":"trustStorePath",
-   "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
-   "dmaap.security.keyStorePath":"keyStorePath",
-   "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
-   "dmaap.security.enableDmaapCertAuth":"true",
-   "dmaap.dmaapProducerConfiguration": {
-         "changeIdentifier":"PM_MEAS_FILES",
-         "feedName":"feed00"
-   },
-   "streams_subscribes":{
-      "dmaap_subscriber":{
-         "dmmap_info":{
-            "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
-         },
-         "type":"message_router"
+  "config": {
+    "//description": "This file is only used for testing purposes",
+    "dmaap.ftpesConfig.keyCert": "/config/dfc.jks",
+    "dmaap.ftpesConfig.keyPassword": "secret",
+    "dmaap.ftpesConfig.trustedCa": "config/ftp.jks",
+    "dmaap.ftpesConfig.trustedCaPassword": "secret",
+    "dmaap.security.trustStorePath": "trustStorePath",
+    "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
+    "dmaap.security.keyStorePath": "keyStorePath",
+    "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath",
+    "dmaap.security.enableDmaapCertAuth": "true",
+    "streams_publishes": {
+      "PM_MEAS_FILES": {
+        "type": "data_router",
+        "dmaap_info": {
+          "username": "user",
+          "log_url": "https://dmaap.example.com/feedlog/972",
+          "publish_url": "https://message-router.onap.svc.cluster.local:3907/publish/1",
+          "location": "loc00",
+          "password": "password",
+          "publisher_id": "972.360gm"
+        }
       }
-   },
-   "feed00":{
-      "username":"user",
-      "log_url":"https://dmaap.example.com/feedlog/972",
-      "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
-      "location":"loc00",
-      "password":"password",
-      "publisher_id":"972.360gm"
-   }
-}
+    },
+    "streams_subscribes": {
+      "dmaap_subscriber": {
+        "dmaap_info": {
+          "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+        },
+        "type": "message_router"
+      }
+    }
+  }
+}
\ No newline at end of file
index a7e2497..61b324c 100644 (file)
@@ -1,49 +1,57 @@
 {
-   "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
-   "dmaap.ftpesConfig.keyPassword":"secret",
-   "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
-   "dmaap.ftpesConfig.trustedCaPassword":"secret",
-   "dmaap.security.trustStorePath":"trustStorePath",
-   "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
-   "dmaap.security.keyStorePath":"keyStorePath",
-   "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
-   "dmaap.security.enableDmaapCertAuth":"true",
-   "dmaap.dmaapProducerConfiguration":[
-      {
-         "changeIdentifier":"PM_MEAS_FILES",
-         "feedName":"feed00"
+  "config": {
+    "//description": "This file is only used for testing purposes",
+    "dmaap.ftpesConfig.keyCert": "/config/dfc.jks",
+    "dmaap.ftpesConfig.keyPassword": "secret",
+    "dmaap.ftpesConfig.trustedCa": "config/ftp.jks",
+    "dmaap.ftpesConfig.trustedCaPassword": "secret",
+    "dmaap.security.trustStorePath": "trustStorePath",
+    "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath",
+    "dmaap.security.keyStorePath": "keyStorePath",
+    "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath",
+    "dmaap.security.enableDmaapCertAuth": "true",
+    "streams_publishes": {
+      "PM_MEAS_FILES": {
+        "type": "data_router",
+        "dmaap_info": {
+          "username": "CYE9fl40",
+          "location": "loc00",
+          "log_url": "https://dmaap-dr-prov/feedlog/4",
+          "publisher_id": "4.307dw",
+          "password": "izBJD8nLjawq0HMG",
+          "publish_url": "https://dmaap-dr-prov/publish/4"
+        }
       },
-      {
-         "changeIdentifier":"XX_FILES",
-         "feedName":"feed01"
+      "XX_FILES": {
+        "type": "data_router",
+        "dmaap_info": {
+          "username": "user",
+          "log_url": "feed01::log_url",
+          "publish_url": "feed01::publish_url",
+          "location": "loc00",
+          "password": "",
+          "publisher_id": ""
+        }
       },
-      {
-         "changeIdentifier":"YY_FILES",
-         "feedName":"feed01"
+      "YY_FILES": {
+        "type": "data_router",
+        "dmaap_info": {
+          "username": "user",
+          "log_url": "feed01::log_url",
+          "publish_url": "feed01::publish_url",
+          "location": "loc00",
+          "password": "",
+          "publisher_id": ""
+        }
       }
-   ],
-   "streams_subscribes":{
-      "dmaap_subscriber":{
-         "dmmap_info":{
-            "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
-         },
-         "type":"message_router"
+    },
+    "streams_subscribes": {
+      "dmaap_subscriber": {
+        "dmaap_info": {
+          "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+        },
+        "type": "message_router"
       }
-   },
-   "feed00":{
-      "username":"user",
-      "log_url":"https://dmaap.example.com/feedlog/972",
-      "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
-      "location":"loc00",
-      "password":"password",
-      "publisher_id":"972.360gm"
-   },
-   "feed01":{
-      "username":"user",
-      "log_url":"feed01::log_url",
-      "publish_url":"feed01::publish_url",
-      "location":"loc00",
-      "password":"",
-      "publisher_id":""
-   }
-}
+    }
+  }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index bab9a67..34376f9 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
 
   <properties>
     <java.version>8</java.version>
-    <sdk.version>1.1.1-SNAPSHOT</sdk.version>
+    <sdk.version>1.1.6</sdk.version>
     <apache.httpcomponents.version>4.1.4</apache.httpcomponents.version>
     <apache.commons.version>3.6</apache.commons.version>
     <immutable.version>2.7.1</immutable.version>