Loading configuration from consul/cbs 81/67681/6
authorwasala <przemyslaw.wasala@nokia.com>
Wed, 19 Sep 2018 10:55:18 +0000 (12:55 +0200)
committerwasala <przemyslaw.wasala@nokia.com>
Thu, 20 Sep 2018 10:50:32 +0000 (12:50 +0200)
*Registered task which calling cbs/consul
for configuration - fixedRate 5 minutes
*Added workflow for loading config from cloud

Change-Id: Iba36d18b4ee0dca082612fa4c92c877f71c9b1fe
Issue-ID: DCAEGEN2-784
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
20 files changed:
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java [moved from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java with 81% similarity]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java [new file with mode: 0644]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java [new file with mode: 0644]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java [new file with mode: 0644]
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java

diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
new file mode 100644 (file)
index 0000000..7570d70
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+public class CloudConfigParser {
+
+    private final JsonObject jsonObject;
+
+    CloudConfigParser(JsonObject jsonObject) {
+        this.jsonObject = jsonObject;
+    }
+
+    DmaapPublisherConfiguration getDmaapPublisherConfig() {
+        return new ImmutableDmaapPublisherConfiguration.Builder()
+            .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
+            .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
+            .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
+            .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
+            .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+            .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
+            .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
+            .build();
+    }
+
+    DmaapConsumerConfiguration getDmaapConsumerConfig() {
+        return new ImmutableDmaapConsumerConfiguration.Builder()
+            .timeoutMS(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt())
+            .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
+            .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
+            .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
+            .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
+            .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
+            .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
+            .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
+            .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
+            .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
+            .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
+            .build();
+    }
+}
\ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
new file mode 100644 (file)
index 0000000..7bf711b
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import com.google.gson.JsonObject;
+import java.util.Optional;
+import java.util.Properties;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
+import org.onap.dcaegen2.collectors.datafile.service.DatafileConfigurationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+@Configuration
+@EnableConfigurationProperties
+@EnableScheduling
+@Primary
+public class CloudConfiguration extends AppConfig {
+
+    private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class);
+
+    private DatafileConfigurationProvider datafileConfigurationProvider;
+    private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
+    private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
+
+    @Value("#{systemEnvironment}")
+    private Properties systemEnvironment;
+
+    @Autowired
+    public void setThreadPoolTaskScheduler(DatafileConfigurationProvider datafileConfigurationProvider) {
+        this.datafileConfigurationProvider = datafileConfigurationProvider;
+    }
+
+
+    protected void runTask() {
+        Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment))
+            .subscribeOn(Schedulers.parallel())
+            .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
+    }
+
+    private void parsingConfigError(Throwable throwable) {
+        logger.warn("Error in case of processing system environment, more details below: ", throwable);
+    }
+
+    private void cloudConfigError(Throwable throwable) {
+        logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
+    }
+
+    private void parsingConfigSuccess(EnvProperties envProperties) {
+        logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul");
+        datafileConfigurationProvider.callForDataFileCollectorConfiguration(envProperties)
+            .subscribe(this::parseCloudConfig, this::cloudConfigError);
+    }
+
+    private void parseCloudConfig(JsonObject jsonObject) {
+        logger.info("Received application configuration: {}", jsonObject);
+        CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
+        dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
+        dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
+    }
+
+    @Override
+    public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+        return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
+    }
+
+    @Override
+    public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+        return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
+    }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java
new file mode 100644 (file)
index 0000000..9f6b273
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.util.Optional;
+import java.util.Properties;
+import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
+import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableEnvProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+class EnvironmentProcessor {
+
+    private static final int DEFAULT_CONSUL_PORT = 8500;
+    private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
+
+    private EnvironmentProcessor() {
+    }
+
+    static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+        logger.info("Loading configuration from system environment variables");
+        EnvProperties envProperties;
+        try {
+            envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
+                .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
+                .appName(getService(systemEnvironment)).build();
+        } catch (EnvironmentLoaderException e) {
+            return Mono.error(e);
+        }
+        logger.info("Evaluated environment system variables {}", envProperties);
+        return Mono.just(envProperties);
+    }
+
+    private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
+        return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
+            .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
+    }
+
+    private static Integer getConsultPort(Properties systemEnvironments) {
+        return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
+            .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
+    }
+
+    private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
+        return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE"))
+            .orElseThrow(
+                () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+    }
+
+    private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
+        return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+            .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+            .orElseThrow(() -> new EnvironmentLoaderException(
+                "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+    }
+
+    private static Integer getDefaultPortOfConsul() {
+        logger.warn("$CONSUL_PORT environment has not been defined");
+        logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+        return DEFAULT_CONSUL_PORT;
+    }
+}
index 1d0a192..512a217 100644 (file)
@@ -16,6 +16,8 @@
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
@@ -40,16 +42,20 @@ import reactor.core.publisher.Mono;
 @EnableScheduling
 public class SchedulerConfig extends DatafileAppConfig {
 
-    private static final int SCHEDULING_DELAY = 2000;
+    private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 10;
+    private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
     private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<ScheduledFuture>();
 
     private final TaskScheduler taskScheduler;
     private final ScheduledTasks scheduledTask;
+    private final CloudConfiguration cloudConfiguration;
 
     @Autowired
-    public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) {
+    public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
+        CloudConfiguration cloudConfiguration) {
         this.taskScheduler = taskScheduler;
         this.scheduledTask = scheduledTask;
+        this.cloudConfiguration = cloudConfiguration;
     }
 
     /**
@@ -62,7 +68,7 @@ public class SchedulerConfig extends DatafileAppConfig {
         scheduledFutureList.forEach(x -> x.cancel(false));
         scheduledFutureList.clear();
         return Mono.defer(() -> Mono
-                .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+            .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
     }
 
     /**
@@ -74,8 +80,11 @@ public class SchedulerConfig extends DatafileAppConfig {
     @ApiOperation(value = "Start task if possible")
     public synchronized boolean tryToStartTask() {
         if (scheduledFutureList.isEmpty()) {
+            scheduledFutureList.add(taskScheduler
+                .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+                    Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
             scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
-                    SCHEDULING_DELAY));
+                Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)));
             return true;
         } else {
             return false;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java
new file mode 100644 (file)
index 0000000..75c2e8a
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.exceptions;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+public class EnvironmentLoaderException extends Exception {
+
+    public EnvironmentLoaderException(String message) {
+        super(message);
+    }
+}
index 0f03b1a..1e2dcc9 100644 (file)
@@ -27,7 +27,7 @@ import java.util.List;
 import org.apache.commons.io.FilenameUtils;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java
new file mode 100644 (file)
index 0000000..8654934
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+@Value.Immutable(prehash = true)
+public interface EnvProperties {
+
+    @Value.Parameter
+    String consulHost();
+
+    @Value.Parameter
+    Integer consulPort();
+
+    @Value.Parameter
+    String cbsName();
+
+    @Value.Parameter
+    String appName();
+
+}
  * ============LICENSE_END========================================================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.service;
+package org.onap.dcaegen2.collectors.datafile.model;
 
+import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 
 /**
  * Contains data, from the fileReady event, about the file to collect from the xNF.
  *
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
  */
 @Value.Immutable
+@Gson.TypeAdapters
 public interface FileData {
-    public String changeIdentifier();
-    public String changeType();
-    public String location();
-    public String compression();
-    public String fileFormatType();
-    public String fileFormatVersion();
+    String changeIdentifier();
+
+    String changeType();
+
+    String location();
+
+    String compression();
+
+    String fileFormatType();
+
+    String fileFormatVersion();
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java
new file mode 100644 (file)
index 0000000..aca87e5
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
+ */
+@Service
+public class DatafileConfigurationProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(DatafileConfigurationProvider.class);
+
+    private final HttpGetClient httpGetClient;
+
+    DatafileConfigurationProvider() {
+        this(new HttpGetClient());
+    }
+
+    DatafileConfigurationProvider(HttpGetClient httpGetClient) {
+        this.httpGetClient = httpGetClient;
+    }
+
+    public Mono<JsonObject> callForDataFileCollectorConfiguration(EnvProperties envProperties) {
+        return callConsulForConfigBindingServiceEndpoint(envProperties)
+            .flatMap(this::callConfigBindingServiceForDatafileConfiguration);
+    }
+
+    private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
+        logger.info("Retrieving Config Binding Service endpoint from Consul");
+        return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class)
+            .flatMap(jsonArray -> this.createConfigBindingServiceUrl(jsonArray, envProperties.appName()));
+
+    }
+
+    private String getConsulUrl(EnvProperties envProperties) {
+        return getUri(envProperties.consulHost(), envProperties.consulPort(), "/v1/catalog/service",
+            envProperties.cbsName());
+    }
+
+    private Mono<JsonObject> callConfigBindingServiceForDatafileConfiguration(String configBindingServiceUri) {
+        logger.info("Retrieving Datafile configuration");
+        return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class);
+    }
+
+
+    private Mono<String> createConfigBindingServiceUrl(JsonArray jsonArray, String appName) {
+        return getConfigBindingObject(jsonArray)
+            .flatMap(jsonObject -> buildConfigBindingServiceUrl(jsonObject, appName));
+    }
+
+    private Mono<String> buildConfigBindingServiceUrl(JsonObject jsonObject, String appName) {
+        return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(),
+            jsonObject.get("ServicePort").getAsInt(), "/service_component", appName));
+    }
+
+    private Mono<JsonObject> getConfigBindingObject(JsonArray jsonArray) {
+        try {
+            if (jsonArray.size() > 0) {
+                return Mono.just(jsonArray.get(0).getAsJsonObject());
+            } else {
+                throw new IllegalStateException("JSON Array was empty");
+            }
+        } catch (IllegalStateException e) {
+            logger.warn("Failed to retrieve JSON Object from array", e);
+            return Mono.error(e);
+        }
+    }
+
+    private String getUri(String host, Integer port, String... paths) {
+        return new DefaultUriBuilderFactory().builder()
+            .scheme("http")
+            .host(host)
+            .port(port)
+            .path(String.join("/", paths))
+            .build().toString();
+    }
+}
index c71d143..72e7d49 100644 (file)
@@ -30,6 +30,8 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseExcept
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.springframework.util.StringUtils;
 
 import reactor.core.publisher.Mono;
@@ -42,7 +44,7 @@ import reactor.core.publisher.Mono;
  */
 public class DmaapConsumerJsonParser {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+    private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
 
     private static final String EVENT = "event";
     private static final String NOTIFICATION_FIELDS = "notificationFields";
@@ -59,8 +61,7 @@ public class DmaapConsumerJsonParser {
     private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
 
     /**
-     * Extract info from string and create @see
-     * {@link org.onap.dcaegen2.collectors.datafile.service.FileData}.
+     * Extract info from string and create @see {@link FileData}.
      *
      * @param monoMessage - results from DMaaP
      * @return reactive Mono with an array of FileData
@@ -71,17 +72,17 @@ public class DmaapConsumerJsonParser {
 
     private Mono<JsonElement> getJsonParserMessage(String message) {
         return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
-                : Mono.fromSupplier(() -> new JsonParser().parse(message));
+            : Mono.fromSupplier(() -> new JsonParser().parse(message));
     }
 
     private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) {
         return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
-                : getFileDataFromJsonArray(jsonElement);
+            : getFileDataFromJsonArray(jsonElement);
     }
 
     private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) {
         return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
-                .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+            .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
     }
 
     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -90,8 +91,8 @@ public class DmaapConsumerJsonParser {
 
     private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) {
         return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP)
-                ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
-                : transform(monoJsonP));
+            ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header"))
+            : transform(monoJsonP));
     }
 
     private Mono<List<FileData>> transform(JsonObject jsonObject) {
@@ -103,28 +104,28 @@ public class DmaapConsumerJsonParser {
             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
 
             if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)
-                    && arrayOfNamedHashMap != null) {
+                && arrayOfNamedHashMap != null) {
                 Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap);
                 return res;
             }
 
             if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) {
                 return Mono.error(
-                        new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
+                    new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject));
             } else if (arrayOfNamedHashMap != null) {
                 return Mono.error(
-                        new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
+                    new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject));
             }
             return Mono.error(
-                    new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
+                new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject));
         }
         return Mono.error(
-                new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
+            new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject));
 
     }
 
     private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType,
-            JsonArray arrayOfAdditionalFields) {
+        JsonArray arrayOfAdditionalFields) {
         List<FileData> res = new ArrayList<>();
         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
             if (arrayOfAdditionalFields.get(i) != null) {
@@ -134,7 +135,7 @@ public class DmaapConsumerJsonParser {
                 if (fileData != null) {
                     res.add(fileData);
                 } else {
-                    LOGGER.error("Unable to collect file from xNF. File information wrong. " + fileInfo);
+                    logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo);
                 }
             }
         }
@@ -152,10 +153,10 @@ public class DmaapConsumerJsonParser {
         String compression = getValueFromJson(data, COMPRESSION);
 
         if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType)
-                && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
+            && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) {
             fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType)
-                    .location(location).compression(compression).fileFormatType(fileFormatType)
-                    .fileFormatVersion(fileFormatVersion).build();
+                .location(location).compression(compression).fileFormatType(fileFormatType)
+                .fileFormatVersion(fileFormatVersion).build();
         }
         return fileData;
     }
@@ -165,20 +166,20 @@ public class DmaapConsumerJsonParser {
     }
 
     private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType,
-            String notificationFieldsVersion) {
+        String notificationFieldsVersion) {
         return ((changeIdentifier != null && !changeIdentifier.isEmpty())
-                && (changeType != null && !changeType.isEmpty())
-                && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty()));
+            && (changeType != null && !changeType.isEmpty())
+            && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty()));
     }
 
     private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) {
         return ((fileFormatVersion != null && !fileFormatVersion.isEmpty())
-                && (fileFormatType != null && !fileFormatType.isEmpty()));
+            && (fileFormatType != null && !fileFormatType.isEmpty()));
     }
 
     private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) {
         return (name != null && !name.isEmpty()) && (location != null && !location.isEmpty())
-                && (compression != null && !compression.isEmpty());
+            && (compression != null && !compression.isEmpty());
     }
 
     private boolean containsHeader(JsonObject jsonObject) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java
new file mode 100644 (file)
index 0000000..796dbbd
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+
+public class HttpGetClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class);
+
+    private final WebClient webClient;
+    private final Gson gson;
+
+    HttpGetClient() {
+        this(WebClient.builder().filter(logRequest()).filter(logResponse()).build());
+    }
+
+    HttpGetClient(WebClient webClient) {
+        this.webClient = webClient;
+        this.gson = new Gson();
+    }
+
+    <T> Mono<T> callHttpGet(String url, Class<T> genericClassDeclaration) {
+        return webClient
+            .get()
+            .uri(url)
+            .retrieve()
+            .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response)))
+            .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response)))
+            .bodyToMono(String.class)
+            .flatMap(body -> getJsonFromRequest(body, genericClassDeclaration));
+    }
+
+    private RuntimeException getException(ClientResponse response) {
+        return new RuntimeException(String.format("Request for cloud config failed: HTTP %d",
+            response.statusCode().value()));
+    }
+
+    private <T> Mono<T> getJsonFromRequest(String body, Class<T> genericClassDeclaration) {
+        try {
+            return Mono.just(parseJson(body, genericClassDeclaration));
+        } catch (JsonSyntaxException | IllegalStateException e) {
+            return Mono.error(e);
+        }
+    }
+
+    private <T> T parseJson(String body, Class<T> genericClassDeclaration) {
+        return gson.fromJson(body, genericClassDeclaration);
+    }
+
+    private static ExchangeFilterFunction logResponse() {
+        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+            logger.info("Response status {}", clientResponse.statusCode());
+            return Mono.just(clientResponse);
+        });
+    }
+
+    private static ExchangeFilterFunction logRequest() {
+        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+            logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+            clientRequest.headers()
+                .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+            return Mono.just(clientRequest);
+        });
+    }
+}
index 30bf536..0c76fc1 100644 (file)
@@ -25,7 +25,7 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
 import org.springframework.web.reactive.function.client.WebClient;
 
index fdd1bb4..839e03c 100644 (file)
@@ -24,7 +24,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.Config;
 import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,8 +55,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     }
 
     protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
-            DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
-            DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) {
+        DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+        DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) {
         this.datafileAppConfig = datafileAppConfig;
         this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
         this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
@@ -79,7 +79,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
         dmaaPConsumerReactiveHttpClient = resolveClient();
         logger.trace("Method called with arg {}", object);
         Mono<List<FileData>> consumerResult =
-                consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
+            consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse()));
         return consumerResult.flatMap(this::getFilesFromSender);
     }
 
@@ -95,8 +95,6 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
     @Override
     protected DmaapConsumerReactiveHttpClient resolveClient() {
-        return dmaaPConsumerReactiveHttpClient == null
-                ? new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient())
-                : dmaaPConsumerReactiveHttpClient;
+        return new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient());
     }
 }
index 8c4d707..5779051 100644 (file)
@@ -49,15 +49,14 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
     }
 
     @Override
-    public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels)
-            throws DatafileTaskException {
+    public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) {
         logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels);
         return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels);
     }
 
     @Override
     public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels)
-            throws DatafileTaskException {
+        throws DatafileTaskException {
         if (consumerDmaapModels == null) {
             throw new DmaapNotFoundException("Invoked null object to DMaaP task");
         }
@@ -73,8 +72,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
 
     @Override
     DmaapProducerReactiveHttpClient resolveClient() {
-        return dmaapProducerReactiveHttpClient == null
-                ? new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient())
-                : dmaapProducerReactiveHttpClient;
+        return new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient());
     }
+
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
new file mode 100644 (file)
index 0000000..a4f098b
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration;
+
+class CloudConfigParserTest {
+
+    private static final String correctJson =
+        "{\"dmaap.dmaapProducerConfiguration.dmaapTopicName\": \"/events/unauthenticated.VES_NOTIFICATION_OUTPUT\", "
+            + "\"dmaap.dmaapConsumerConfiguration.timeoutMS\": -1,"
+            + " \"dmaap.dmaapConsumerConfiguration.dmaapHostName\": \"message-router.onap.svc.cluster.local\","
+            + "\"dmaap.dmaapConsumerConfiguration.dmaapUserName\": \"admin\", "
+            + "\"dmaap.dmaapProducerConfiguration.dmaapPortNumber\": 3904, "
+            + "\"dmaap.dmaapConsumerConfiguration.dmaapUserPassword\": \"admin\", "
+            + "\"dmaap.dmaapProducerConfiguration.dmaapProtocol\": \"http\", "
+            + "\"dmaap.dmaapProducerConfiguration.dmaapContentType\": \"application/json\", "
+            + "\"dmaap.dmaapConsumerConfiguration.dmaapTopicName\": \"/events/unauthenticated.VES_NOTIFICATION_OUTPUT\", "
+            + "\"dmaap.dmaapConsumerConfiguration.dmaapPortNumber\": 3904, "
+            + "\"dmaap.dmaapConsumerConfiguration.dmaapContentType\": \"application/json\", "
+            + "\"dmaap.dmaapConsumerConfiguration.messageLimit\": -1, "
+            + "\"dmaap.dmaapConsumerConfiguration.dmaapProtocol\": \"http\", "
+            + "\"dmaap.dmaapConsumerConfiguration.consumerId\": \"c12\","
+            + "\"dmaap.dmaapProducerConfiguration.dmaapHostName\": \"message-router.onap.svc.cluster.local\", "
+            + "\"dmaap.dmaapConsumerConfiguration.consumerGroup\": \"OpenDCAE-c12\", "
+            + "\"dmaap.dmaapProducerConfiguration.dmaapUserName\": \"admin\", "
+            + "\"dmaap.dmaapProducerConfiguration.dmaapUserPassword\": \"admin\"}";
+
+    private static final ImmutableDmaapConsumerConfiguration correctDmaapConsumerConfig =
+        new ImmutableDmaapConsumerConfiguration.Builder()
+            .timeoutMS(-1)
+            .dmaapHostName("message-router.onap.svc.cluster.local")
+            .dmaapUserName("admin")
+            .dmaapUserPassword("admin")
+            .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
+            .dmaapPortNumber(3904)
+            .dmaapContentType("application/json")
+            .messageLimit(-1)
+            .dmaapProtocol("http")
+            .consumerId("c12")
+            .consumerGroup("OpenDCAE-c12")
+            .build();
+
+    private static final ImmutableDmaapPublisherConfiguration correctDmaapPublisherConfig =
+        new ImmutableDmaapPublisherConfiguration.Builder()
+            .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
+            .dmaapUserPassword("admin")
+            .dmaapPortNumber(3904)
+            .dmaapProtocol("http")
+            .dmaapContentType("application/json")
+            .dmaapHostName("message-router.onap.svc.cluster.local")
+            .dmaapUserName("admin")
+            .build();
+
+    private CloudConfigParser cloudConfigParser = new CloudConfigParser(
+        new Gson().fromJson(correctJson, JsonObject.class));
+
+
+    @Test
+    public void shouldCreateDmaapConsumerConfigurationCorrectly() {
+        // when
+        DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig();
+
+        // then
+        assertThat(dmaapConsumerConfig).isNotNull();
+        assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(correctDmaapConsumerConfig);
+    }
+
+
+    @Test
+    public void shouldCreateDmaapPublisherConfigurationCorrectly() {
+        // when
+        DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig();
+
+        // then
+        assertThat(dmaapPublisherConfig).isNotNull();
+        assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(correctDmaapPublisherConfig);
+    }
+}
\ No newline at end of file
index 5b9d0aa..2f61ac9 100644 (file)
@@ -28,14 +28,13 @@ import java.util.List;
 
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 
 import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- *
  */
 public class FileCollectorTest {
 
@@ -64,15 +63,15 @@ public class FileCollectorTest {
     public void whenSingleFtpesFile_returnCorrectResponse() {
         List<FileData> listOfFileData = new ArrayList<FileData>();
         listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
-                .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
+            .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION)
+            .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
 
         FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
-                .userId("").password("").build();
+            .userId("").password("").build();
         when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
 
         Mono<List<ConsumerDmaapModel>> consumerModelsMono =
-                fileCollectorUndetTest.getFilesFromSender(listOfFileData);
+            fileCollectorUndetTest.getFilesFromSender(listOfFileData);
 
         List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
         assertEquals(1, consumerModels.size());
@@ -82,7 +81,7 @@ public class FileCollectorTest {
         assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
         assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
         FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
-                .userId("").password("").port(PORT_22).build();
+            .userId("").password("").port(PORT_22).build();
         verify(ftpsClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
         verifyNoMoreInteractions(ftpsClientMock);
     }
@@ -91,15 +90,15 @@ public class FileCollectorTest {
     public void whenSingleSftpFile_returnCorrectResponse() {
         List<FileData> listOfFileData = new ArrayList<FileData>();
         listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER)
-                .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
+            .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION)
+            .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build());
 
         FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22)
-                .userId("").password("").build();
+            .userId("").password("").build();
         when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true);
 
         Mono<List<ConsumerDmaapModel>> consumerModelsMono =
-                fileCollectorUndetTest.getFilesFromSender(listOfFileData);
+            fileCollectorUndetTest.getFilesFromSender(listOfFileData);
 
         List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block();
         assertEquals(1, consumerModels.size());
@@ -109,7 +108,7 @@ public class FileCollectorTest {
         assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion());
         assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation());
         FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS)
-                .userId("").password("").port(PORT_22).build();
+            .userId("").password("").port(PORT_22).build();
         verify(sftpClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
         verifyNoMoreInteractions(ftpsClientMock);
     }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java
new file mode 100644 (file)
index 0000000..efd8983
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.model.EnvProperties;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableEnvProperties;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class DatafileConfigurationProviderTest {
+    private static final Gson gson = new Gson();
+    private static final String configBindingService = "[{\"ID\":\"9c8dd674-34ce-7049-d318-e98d93a64303\",\"Node\""
+        + ":\"dcae-bootstrap\",\"Address\":\"10.42.52.82\",\"Datacenter\":\"dc1\",\"TaggedAddresses\":"
+        + "{\"lan\":\"10.42.52.82\",\"wan\":\"10.42.52.82\"},\"NodeMeta\":{\"consul-network-segment\":\"\"},"
+        + "\"ServiceID\":\"dcae-cbs1\",\"ServiceName\":\"config-binding-service\",\"ServiceTags\":[],"
+        + "\"ServiceAddress\":\"config-binding-service\",\"ServicePort\":10000,\"ServiceEnableTagOverride\":false,"
+        + "\"CreateIndex\":14352,\"ModifyIndex\":14352},{\"ID\":\"35c6f540-a29c-1a92-23b0-1305bd8c81f5\",\"Node\":"
+        + "\"dev-consul-server-1\",\"Address\":\"10.42.165.51\",\"Datacenter\":\"dc1\",\"TaggedAddresses\":"
+        + "{\"lan\":\"10.42.165.51\",\"wan\":\"10.42.165.51\"},\"NodeMeta\":{\"consul-network-segment\":\"\"},"
+        + "\"ServiceID\":\"dcae-cbs1\",\"ServiceName\":\"config-binding-service\",\"ServiceTags\":[],"
+        + "\"ServiceAddress\":\"config-binding-service\",\"ServicePort\":10000,\"ServiceEnableTagOverride\":false,"
+        + "\"CreateIndex\":803,\"ModifyIndex\":803}]";
+    private static final JsonArray configBindingServiceJson = gson.fromJson(configBindingService, JsonArray.class);
+    private static final JsonArray emptyConfigBindingServiceJson = gson.fromJson("[]", JsonArray.class);
+    private static final String datafileCollectorMockConfiguration = "{\"test\":1}";
+    private static final JsonObject datafileCollectorMockConfigurationJson = gson.fromJson(datafileCollectorMockConfiguration, JsonObject.class);
+
+    private EnvProperties envProperties = ImmutableEnvProperties.builder()
+        .appName("dcae-datafile-collector")
+        .cbsName("config-binding-service")
+        .consulHost("consul")
+        .consulPort(8500)
+        .build();
+
+    @Test
+    void shouldReturnDatafileCollectorConfiguration() {
+        // given
+        HttpGetClient webClient = mock(HttpGetClient.class);
+        when(
+            webClient.callHttpGet("http://consul:8500/v1/catalog/service/config-binding-service", JsonArray.class))
+            .thenReturn(Mono.just(configBindingServiceJson));
+        when(webClient.callHttpGet("http://config-binding-service:10000/service_component/dcae-datafile-collector", JsonObject.class))
+            .thenReturn(Mono.just(datafileCollectorMockConfigurationJson));
+
+        DatafileConfigurationProvider provider = new DatafileConfigurationProvider(webClient);
+
+        //when/then
+        StepVerifier.create(provider.callForDataFileCollectorConfiguration(envProperties)).expectSubscription()
+            .expectNext(datafileCollectorMockConfigurationJson).verifyComplete();
+    }
+
+    @Test
+    void shouldReturnMonoErrorWhenConsuleDoesntHaveConfigBindingServiceEntry() {
+        // given
+        HttpGetClient webClient = mock(HttpGetClient.class);
+        when(
+            webClient.callHttpGet("http://consul:8500/v1/catalog/service/config-binding-service", JsonArray.class))
+            .thenReturn(Mono.just(emptyConfigBindingServiceJson));
+
+        DatafileConfigurationProvider provider = new DatafileConfigurationProvider(webClient);
+
+        //when/then
+        StepVerifier.create(provider.callForDataFileCollectorConfiguration(envProperties)).expectSubscription()
+            .expectError(IllegalStateException.class).verify();
+    }
+}
\ No newline at end of file
index 4aad5f4..8c36a51 100644 (file)
@@ -26,6 +26,8 @@ import java.util.Optional;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
 
@@ -44,6 +46,7 @@ class DmaapConsumerJsonParserTest {
     @Test
     void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz")
+
                 .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip")
                 .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build();
         JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES")
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java
new file mode 100644 (file)
index 0000000..d95e341
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import org.junit.jupiter.api.Test;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class HttpGetClientTest {
+
+    private static final String SOMEURL = "http://someurl";
+    private static final String DATA = "{}";
+    private Gson gson = new Gson();
+    private WebClient webClient = mock(WebClient.class);
+    private WebClient.RequestHeadersUriSpec requestBodyUriSpec = mock(WebClient.RequestBodyUriSpec.class);
+    private WebClient.ResponseSpec responseSpec = mock(WebClient.ResponseSpec.class);
+
+    @Test
+    void shouldReturnJsonObjectOnGetCall() {
+        //given
+        mockWebClientDependantObject();
+        HttpGetClient httpGetClient = new HttpGetClient(webClient);
+        when(responseSpec.bodyToMono(String.class)).thenReturn(Mono.just(DATA));
+
+        //when/then
+        StepVerifier.create(httpGetClient.callHttpGet(SOMEURL, JsonObject.class)).expectSubscription()
+            .expectNext(gson.fromJson(DATA, JsonObject.class)).verifyComplete();
+    }
+
+    @Test
+    void shouldReturnMonoErrorOnGetCall() {
+        //given
+        mockWebClientDependantObject();
+        HttpGetClient httpGetClient = new HttpGetClient(webClient);
+        when(responseSpec.bodyToMono(String.class)).thenReturn(Mono.just("some wrong data"));
+
+        //when/then
+        StepVerifier.create(httpGetClient.callHttpGet(SOMEURL, JsonObject.class)).expectSubscription()
+            .expectError(JsonSyntaxException.class).verify();
+    }
+
+
+    private void mockWebClientDependantObject() {
+        doReturn(requestBodyUriSpec).when(webClient).get();
+        when(requestBodyUriSpec.uri(SOMEURL)).thenReturn(requestBodyUriSpec);
+        doReturn(responseSpec).when(requestBodyUriSpec).retrieve();
+        doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+        doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+    }
+}
\ No newline at end of file
index c21c598..e681845 100644 (file)
@@ -38,9 +38,9 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseExcept
 import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
 import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.collectors.datafile.service.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;