Extension of the DFC to be able to handle any file types types, which are published on different DR feeds.
This association between file type and DR feed is defined by configuration.
The file type is defined by the changeIdentifier in the fileReady VES message reported from the PNF.
The creation of DR feeds and configuration will be done by the DMAAP plugin, but that is
not tested yet.
Change-Id: I13b36acd926a6941ee733e6b37922049fb54a5d9
Issue-ID: DCAEGEN2-1532
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "",
- "dmaapUserPassword": "",
- "dmaapContentType": "application/json",
- "consumerId": "C12",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMs": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3907,
- "dmaapTopicName": "publish",
- "dmaapProtocol": "https",
- "dmaapUserName": "dradmin",
- "dmaapUserPassword": "dradmin",
- "dmaapContentType": "application/octet-stream"
- }
- },
- "ftp": {
- "ftpesConfiguration": {
- "keyCert": "config/dfc.jks",
- "keyPassword": "secret",
- "trustedCa": "config/ftp.jks",
- "trustedCaPassword": "secret"
- }
- },
- "security": {
- "trustStorePath" : "change it",
- "trustStorePasswordPath" : "change it",
- "keyStorePath" : "change it",
- "keyStorePasswordPath" : "change it",
- "enableDmaapCertAuth" : "false"
- }
- }
+ "//description":"This file is only used for testing purposes",
+ "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword":"secret",
+ "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword":"secret",
+ "dmaap.security.trustStorePath":"change it",
+ "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
+ "dmaap.security.keyStorePath":"keyStorePath",
+ "dmaap.security.keyStorePasswordPath":"change it",
+ "dmaap.security.enableDmaapCertAuth":"false",
+ "dmaap.dmaapProducerConfiguration" : {
+ "changeIdentifier":"PM_MEAS_FILES",
+ "feedName":"feed00"
+ },
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "dmmap_info":{
+ "topic_url":"http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type":"message_router"
+ }
+ },
+ "feed00":{
+ "username":"user",
+ "log_url":"https://localhost:3907/feedlog/1",
+ "publish_url":"https://localhost:3907/publish/1",
+ "location":"loc00",
+ "password":"dradmin",
+ "publisher_id":"972.360gm"
+ }
}
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ </dependency>
<!--TESTS DEPENDENCIES -->
<dependency>
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
+
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
import java.util.ServiceLoader;
+
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
/**
* Holds all configuration for the DFC.
*
*/
@Component
+@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
@EnableConfigurationProperties
@ConfigurationProperties("app")
public class AppConfig {
-
- private static final String CONFIG = "configs";
- private static final String DMAAP = "dmaap";
- private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
- private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private static final String FTP = "ftp";
- private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
- private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- private DmaapConsumerConfiguration dmaapConsumerConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private ConsumerConfiguration dmaapConsumerConfiguration;
+ private Map<String, PublisherConfiguration> publishingConfiguration;
private FtpesConfig ftpesConfiguration;
+ private CloudConfigurationProvider cloudConfigurationProvider;
+ @Value("#{systemEnvironment}")
+ Properties systemEnvironment;
+ private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ @Autowired
+ public synchronized void setCloudConfigurationProvider(
+ CloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ this.cloudConfigurationProvider = reactiveCloudConfigurationProvider;
+ }
+
+ public synchronized void setFilepath(String filepath) {
+ this.filepath = filepath;
+ }
+
+ /**
+ * Reads the cloud configuration.
+ */
+ public void initialize() {
+ stop();
+ Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
+ loadConfigurationFromFile();
+
+ refreshConfigTask = Flux.interval(Duration.ZERO, Duration.ofMinutes(5))
+ .flatMap(count -> createRefreshConfigurationTask(count, context))
+ .subscribe(e -> logger.info("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ public void stop() {
+ if (refreshConfigTask != null) {
+ refreshConfigTask.dispose();
+ refreshConfigTask = null;
+ }
+ }
+
+ public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherConfiguration;
+ public synchronized boolean isFeedConfigured(String changeIdentifier) {
+ return publishingConfiguration.containsKey(changeIdentifier);
+ }
+
+ public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
+ throws DatafileTaskException {
+
+ if (publishingConfiguration == null) {
+ throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
+ }
+ PublisherConfiguration cfg = publishingConfiguration.get(changeIdentifier);
+ if (cfg == null) {
+ throw new DatafileTaskException(
+ "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
+ }
+ return cfg;
}
public synchronized FtpesConfig getFtpesConfiguration() {
return ftpesConfiguration;
}
+ Flux<AppConfig> createRefreshConfigurationTask(Long counter, Map<String, String> context) {
+ return Flux.just(counter) //
+ .doOnNext(cnt -> logger.debug("Refresh config {}", cnt)) //
+ .flatMap(cnt -> readEnvironmentVariables(systemEnvironment, context)) //
+ .flatMap(this::fetchConfiguration);
+ }
+
+ Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> context) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context)
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ private static <R> Mono<R> onErrorResume(Throwable trowable) {
+ logger.error("Could not refresh application configuration {}", trowable.toString());
+ return Mono.empty();
+ }
+
+ private Mono<AppConfig> fetchConfiguration(EnvProperties env) {
+ Mono<JsonObject> serviceCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(env) //
+ .onErrorResume(AppConfig::onErrorResume);
+
+ // Note, have to use this callForServiceConfigurationReactive with EnvProperties, since the
+ // other ones does not work
+ EnvProperties dmaapEnv = ImmutableEnvProperties.builder() //
+ .consulHost(env.consulHost()) //
+ .consulPort(env.consulPort()) //
+ .cbsName(env.cbsName()) //
+ .appName(env.appName() + ":dmaap") //
+ .build(); //
+ Mono<JsonObject> dmaapCfg = cloudConfigurationProvider.callForServiceConfigurationReactive(dmaapEnv)
+ .onErrorResume(t -> Mono.just(new JsonObject()));
+
+ return serviceCfg.zipWith(dmaapCfg, this::parseCloudConfig) //
+ .onErrorResume(AppConfig::onErrorResume);
+ }
+
+ /**
+ * parse configuration
+ *
+ * @param serviceConfigRootObject
+ * @param dmaapConfigRootObject if there is no dmaapConfigRootObject, the dmaap feeds are taken
+ * from the serviceConfigRootObject
+ * @return this which is updated if successful
+ */
+ private AppConfig parseCloudConfig(JsonObject serviceConfigRootObject, JsonObject dmaapConfigRootObject) {
+ try {
+ CloudConfigParser parser = new CloudConfigParser(serviceConfigRootObject, dmaapConfigRootObject);
+ setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfig(),
+ parser.getFtpesConfig());
+ } catch (DatafileTaskException e) {
+ logger.error("Could not parse configuration {}", e.toString(), e);
+ }
+ return this;
+ }
+
/**
* Reads the configuration from file.
*/
- public void loadConfigurationFromFile() {
+ void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- JsonParser parser = new JsonParser();
- JsonObject jsonObject;
+
try (InputStream inputStream = createInputStream(filepath)) {
- JsonElement rootElement = getJsonElement(parser, inputStream);
- if (rootElement.isJsonObject()) {
- jsonObject = rootElement.getAsJsonObject();
- FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
- FtpesConfig.class);
- DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapConsumerConfiguration.class);
-
- DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
- concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapPublisherConfiguration.class);
-
- setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
+ JsonParser parser = new JsonParser();
+ JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
+ if (rootObject == null) {
+ throw new JsonSyntaxException("Root is not a json object");
}
+ parseCloudConfig(rootObject, rootObject);
} catch (JsonSyntaxException | IOException e) {
- logger.error("Problem with loading configuration, file: {}", filepath, e);
+ logger.warn("Local configuration file not loaded: {}", filepath, e);
}
}
- synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
- DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
- this.dmaapConsumerConfiguration = consumerConfiguration;
- this.dmaapPublisherConfiguration = publisherConfiguration;
- this.ftpesConfiguration = ftpesConfig;
+ private synchronized void setConfiguration(ConsumerConfiguration consumerConfiguration,
+ Map<String, PublisherConfiguration> publisherConfiguration, FtpesConfig ftpesConfig) {
+ if (consumerConfiguration == null || publisherConfiguration == null || ftpesConfig == null) {
+ logger.error(
+ "Problem with configuration consumerConfiguration: {}, publisherConfiguration: {}, ftpesConfig: {}",
+ consumerConfiguration, publisherConfiguration, ftpesConfig);
+ } else {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.publishingConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
}
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
- private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
- return gsonBuilder.create().fromJson(jsonObject, type);
- }
-
InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- synchronized String getFilepath() {
- return this.filepath;
- }
-
- public synchronized void setFilepath(String filepath) {
- this.filepath = filepath;
- }
-
- private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
- return target;
- }
-
}
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class CloudConfigParser {
-
private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
- private final JsonObject jsonObject;
+ private final JsonObject serviceConfigurationRoot;
+ private final JsonObject dmaapConfigurationRoot;
- CloudConfigParser(JsonObject jsonObject) {
- this.jsonObject = jsonObject;
+ public CloudConfigParser(JsonObject serviceConfigurationRoot, JsonObject dmaapConfigurationRoot) {
+ this.serviceConfigurationRoot = serviceConfigurationRoot;
+ this.dmaapConfigurationRoot = dmaapConfigurationRoot;
}
- DmaapPublisherConfiguration getDmaapPublisherConfig() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
- .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
- .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
- .build();
+ public Map<String, PublisherConfiguration> getDmaapPublisherConfig() throws DatafileTaskException {
+ Iterator<JsonElement> producerCfgs =
+ toArray(serviceConfigurationRoot.get("dmaap.dmaapProducerConfiguration")).iterator();
+
+ Map<String, PublisherConfiguration> result = new HashMap<>();
+
+ while (producerCfgs.hasNext()) {
+ JsonObject producerCfg = producerCfgs.next().getAsJsonObject();
+ String feedName = getAsString(producerCfg, "feedName");
+ JsonObject feedConfig = getFeedConfig(feedName);
+
+ PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(getAsString(feedConfig, "publish_url")) //
+ .passWord(getAsString(feedConfig, "password")) //
+ .userName(getAsString(feedConfig, "username")) //
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH)) //
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH)) //
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ .changeIdentifier(getAsString(producerCfg, "changeIdentifier")) //
+ .logUrl(getAsString(feedConfig, "log_url")) //
+ .build();
+
+ result.put(cfg.changeIdentifier(), cfg);
+ }
+ return result;
}
- DmaapConsumerConfiguration getDmaapConsumerConfig() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
- .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
- .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
- .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
- .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(DMAAP_SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(DMAAP_SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
+ public ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException {
+ JsonObject consumerCfg = serviceConfigurationRoot.get("streams_subscribes").getAsJsonObject();
+ Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
+ if (topics.size() != 1) {
+ throw new DatafileTaskException("Invalid configuration, number oftopic must be one, config: " + topics);
+ }
+ JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(topic, "dmmap_info").getAsJsonObject();
+ String topicUrl = getAsString(dmaapInfo, "topic_url");
+
+ return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl)
+ .trustStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PATH))
+ .trustStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))
+ .keyStorePath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PATH))
+ .keyStorePasswordPath(getAsString(serviceConfigurationRoot, DMAAP_SECURITY_KEY_STORE_PASS_PATH))
+ .enableDmaapCertAuth(
+ get(serviceConfigurationRoot, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
.build();
}
- FtpesConfig getFtpesConfig() {
+ public FtpesConfig getFtpesConfig() throws DatafileTaskException {
return new ImmutableFtpesConfig.Builder() //
- .keyCert(jsonObject.get("dmaap.ftpesConfig.keyCert").getAsString())
- .keyPassword(jsonObject.get("dmaap.ftpesConfig.keyPassword").getAsString())
- .trustedCa(jsonObject.get("dmaap.ftpesConfig.trustedCa").getAsString())
- .trustedCaPassword(jsonObject.get("dmaap.ftpesConfig.trustedCaPassword").getAsString()) //
+ .keyCert(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyCert"))
+ .keyPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.keyPassword"))
+ .trustedCa(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCa"))
+ .trustedCaPassword(getAsString(serviceConfigurationRoot, "dmaap.ftpesConfig.trustedCaPassword")) //
.build();
}
+
+ private static JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
+ JsonElement elem = obj.get(memberName);
+ if (elem == null) {
+ throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
+ }
+ return elem;
+ }
+
+ private static String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
+ return get(obj, memberName).getAsString();
+ }
+
+ private JsonObject getFeedConfig(String feedName) throws DatafileTaskException {
+ JsonElement elem = dmaapConfigurationRoot.get(feedName);
+ if (elem == null) {
+ elem = get(serviceConfigurationRoot, feedName); // Fallback, try to find it under
+ // serviceConfigurationRoot
+ }
+ return elem.getAsJsonObject();
+ }
+
+ private static JsonArray toArray(JsonElement obj) {
+ if (obj.isJsonArray()) {
+ return obj.getAsJsonArray();
+ }
+ JsonArray arr = new JsonArray();
+ arr.add(obj);
+ return arr;
+ }
}
+++ /dev/null
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import com.google.gson.JsonObject;
-import java.util.Map;
-import java.util.Properties;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.ReactiveCloudConfigurationProvider;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Primary;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
-
-/**
- * Gets the DFC configuration from the ConfigBindingService/Consul and parses it to the configurations needed in DFC.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Configuration
-@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
-@EnableConfigurationProperties
-@EnableScheduling
-@Primary
-public class CloudConfiguration extends AppConfig {
- private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class);
- private ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider;
- private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
- private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
- private FtpesConfig ftpesCloudConfiguration;
-
- @Value("#{systemEnvironment}")
- private Properties systemEnvironment;
-
- @Autowired
- public synchronized void setThreadPoolTaskScheduler(
- ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
- this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
- }
-
- /**
- * Reads the cloud configuration.
- */
- public void runTask() {
- Map<String,String> context = MappedDiagnosticContext.initializeTraceContext();
- EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context) //
- .subscribeOn(Schedulers.parallel()) //
- .flatMap(reactiveCloudConfigurationProvider::callForServiceConfigurationReactive) //
- .flatMap(this::parseCloudConfig) //
- .subscribe(null, this::onError, this::onComplete);
- }
-
- private void onComplete() {
- logger.trace("Configuration updated");
- }
-
- private void onError(Throwable throwable) {
- logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
- }
-
- private synchronized Mono<CloudConfiguration> parseCloudConfig(JsonObject jsonObject) {
- logger.info("Received application configuration: {}", jsonObject);
- CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
- dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
- dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
- ftpesCloudConfiguration = cloudConfigParser.getFtpesConfig();
- return Mono.just(this);
- }
-
- @Override
- public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherCloudConfiguration != null ? dmaapPublisherCloudConfiguration
- : super.getDmaapPublisherConfiguration();
- }
-
- @Override
- public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return dmaapConsumerCloudConfiguration != null ? dmaapConsumerCloudConfiguration
- : super.getDmaapConsumerConfiguration();
- }
-
- @Override
- public synchronized FtpesConfig getFtpesConfiguration() {
- return ftpesCloudConfiguration != null ? ftpesCloudConfiguration : super.getFtpesConfiguration();
- }
-}
--- /dev/null
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public abstract class ConsumerConfiguration {
+ @Value.Redacted
+ public abstract String topicUrl();
+
+ public abstract String trustStorePath();
+
+ public abstract String trustStorePasswordPath();
+
+ public abstract String keyStorePath();
+
+ public abstract String keyStorePasswordPath();
+
+ public abstract Boolean enableDmaapCertAuth();
+
+ public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException {
+ try {
+ URL url = new URL(topicUrl());
+ String passwd = "";
+ String userName = "";
+ if (url.getUserInfo() != null) {
+ String[] userInfo = url.getUserInfo().split(":");
+ userName = userInfo[0];
+ passwd = userInfo[1];
+ }
+ String urlPath = url.getPath();
+ DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+
+ return new ImmutableDmaapConsumerConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(path.dmaapTopicName) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(userName) //
+ .dmaapUserPassword(passwd) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .consumerId(path.consumerId) //
+ .consumerGroup(path.consumerGroup) //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .build();
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Could not parse the URL", e);
+ }
+ }
+
+ private class DmaapConsumerUrlPath {
+ final String dmaapTopicName;
+ final String consumerGroup;
+ final String consumerId;
+
+ DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+ this.dmaapTopicName = dmaapTopicName;
+ this.consumerGroup = consumerGroup;
+ this.consumerId = consumerId;
+ }
+ }
+
+ private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException {
+ String[] tokens = urlPath.split("/"); // UrlPath: /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12
+ if (tokens.length != 5) {
+ throw new DatafileTaskException("The path has incorrect syntax: " + urlPath);
+ }
+
+ final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // ex. // /events/unauthenticated.VES_NOTIFICATION_OUTPUT
+ final String consumerGroup = tokens[3]; // ex. OpenDcae-c12
+ final String consumerId = tokens[4]; // ex. C12
+ return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ }
+
+}
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import reactor.core.publisher.Mono;
/**
} catch (EnvironmentLoaderException e) {
return Mono.error(e);
}
- logger.info("Evaluated environment system variables {}", envProperties);
+ logger.trace("Evaluated environment system variables {}", envProperties);
return Mono.just(envProperties);
}
package org.onap.dcaegen2.collectors.datafile.configuration;
import java.io.Serializable;
+
import org.immutables.gson.Gson;
import org.immutables.value.Value;
import org.springframework.stereotype.Component;
@Component
@Value.Immutable
-@Value.Style(builder = "new")
+@Value.Style(builder = "new", redactedMask = "####")
@Gson.TypeAdapters
public abstract class FtpesConfig implements Serializable {
public abstract String keyCert();
@Value.Parameter
+ @Value.Redacted
public abstract String keyPassword();
@Value.Parameter
public abstract String trustedCa();
@Value.Parameter
+ @Value.Redacted
public abstract String trustedCaPassword();
}
--- /dev/null
+/*-
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.configuration;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+@Gson.TypeAdapters
+public interface PublisherConfiguration {
+
+ String publishUrl();
+
+ String logUrl();
+
+ String userName();
+
+ @Value.Redacted
+ String passWord();
+
+ String trustStorePath();
+
+ String trustStorePasswordPath();
+
+ String keyStorePath();
+
+ String keyStorePasswordPath();
+
+ Boolean enableDmaapCertAuth();
+
+ String changeIdentifier();
+
+ default DmaapPublisherConfiguration toDmaap() throws MalformedURLException {
+ URL url = new URL(publishUrl());
+ String urlPath = url.getPath();
+
+ return new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapPortNumber(url.getPort()) //
+ .dmaapHostName(url.getHost()) //
+ .dmaapTopicName(urlPath) //
+ .dmaapProtocol(url.getProtocol()) //
+ .dmaapUserName(this.userName()) //
+ .dmaapUserPassword(this.passWord()) //
+ .trustStorePath(this.trustStorePath()) //
+ .trustStorePasswordPath(this.trustStorePasswordPath()) //
+ .keyStorePath(this.keyStorePath()) //
+ .keyStorePasswordPath(this.keyStorePasswordPath()) //
+ .enableDmaapCertAuth(this.enableDmaapCertAuth()) //
+ .build();
+ }
+
+}
package org.onap.dcaegen2.collectors.datafile.configuration;
-import io.swagger.annotations.ApiOperation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.slf4j.Logger;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
+import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
public class SchedulerConfig {
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
- private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class);
private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final CloudConfiguration cloudConfiguration;
+ private final AppConfig configuration;
/**
* Constructor.
*
* @param taskScheduler The scheduler used to schedule the tasks.
* @param scheduledTasks The scheduler that will actually handle the tasks.
- * @param cloudConfiguration The DFC configuration.
+ * @param configuration The DFC configuration.
*/
@Autowired
public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
- CloudConfiguration cloudConfiguration) {
+ AppConfig configuration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTasks;
- this.cloudConfiguration = cloudConfiguration;
+ this.configuration = configuration;
}
/**
public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
scheduledFutureList.forEach(x -> x.cancel(false));
scheduledFutureList.clear();
+ configuration.stop();
MDC.setContextMap(contextMap);
logger.info("Stopped Datafile workflow");
MDC.clear();
public synchronized boolean tryToStartTask() {
contextMap = MappedDiagnosticContext.initializeTraceContext();
logger.info("Start scheduling Datafile workflow");
+ configuration.initialize();
+
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask,
- Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(
- taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::executeDatafileMainTask,
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
scheduledFutureList
.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
.build();
}
- private ApiInfo apiInfo() {
+ private static ApiInfo apiInfo() {
return new ApiInfoBuilder() //
.title(API_TITLE) //
.description(DESCRIPTION) //
return tomcat;
}
- private Connector getHttpConnector() {
+ private static Connector getHttpConnector() {
Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
connector.setScheme("http");
connector.setPort(8100);
public Mono<ResponseEntity<String>> startTasks() {
return Mono.fromSupplier(schedulerConfig::tryToStartTask) //
- .map(this::createStartTaskResponse);
+ .map(ScheduleController::createStartTaskResponse);
}
/**
}
@ApiOperation(value = "Sends success or error response on starting task execution")
- private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
+ private static ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
if (wasScheduled) {
return new ResponseEntity<>("Datafile Service has been started!", HttpStatus.CREATED);
} else {
package org.onap.dcaegen2.collectors.datafile.ftp;
import java.util.Optional;
+
import org.immutables.value.Value;
/**
*
*/
@Value.Immutable
+@Value.Style(redactedMask = "####")
public interface FileServerData {
public String serverAddress();
public String userId();
+ @Value.Redacted
public String password();
public Optional<Integer> port();
logger.trace("collectFile fetched: {}", localFileName);
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
logger.warn("Local file {} already created", localFileName);
}
OutputStream output = new FileOutputStream(localFile);
- logger.debug("File {} opened xNF", localFileName);
+ logger.trace("File {} opened xNF", localFileName);
return output;
}
try {
sftpChannel.get(remoteFile, localFile.toString());
- logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+ logger.trace("File {} Download Successfull from xNF", localFile.getFileName());
} catch (SftpException e) {
boolean retry = e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE && e.id != ChannelSftp.SSH_FX_PERMISSION_DENIED && e.id != ChannelSftp.SSH_FX_OP_UNSUPPORTED;
throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData, e, retry);
}
}
- private int getPort(Optional<Integer> port) {
+ private static int getPort(Optional<Integer> port) {
return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
}
- private Session setUpSession(FileServerData fileServerData) throws JSchException {
+ private static Session setUpSession(FileServerData fileServerData) throws JSchException {
JSch jsch = new JSch();
Session newSession =
return newSession;
}
- private ChannelSftp getChannel(Session session) throws JSchException {
+ private static ChannelSftp getChannel(Session session) throws JSchException {
Channel channel = session.openChannel("sftp");
channel.connect();
return (ChannelSftp) channel;
*/
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public abstract class FileData {
public static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
*
* @return the URL to use to fetch the file from the PNF
*/
+ @Value.Redacted
public abstract String location();
/**
* @return An <code>Optional</code> containing a String array with the user name and password if given, or an empty
* <code>Optional</code> if not given.
*/
- private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+ private static Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
if (userInfoString != null) {
String[] userAndPassword = userInfoString.split(":");
if (userAndPassword.length == 2) {
@Value.Immutable
@Gson.TypeAdapters
+@Value.Style(redactedMask = "####")
public interface FilePublishInformation {
@SerializedName("productName")
String getTimeZoneOffset();
@SerializedName("location")
+ @Value.Redacted
String getLocation();
@SerializedName("compression")
String getName();
Map<String, String> getContext();
+
+ String getChangeIdentifier();
}
*/
public abstract class JsonSerializer {
+ private JsonSerializer() {}
- private static Gson gson =
- new GsonBuilder() //
- .serializeNulls() //
- .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
- .create(); //
+ private static Gson gson = new GsonBuilder() //
+ .serializeNulls() //
+ .addSerializationExclusionStrategy(new FilePublishInformationExclusionStrategy()) //
+ .create(); //
/**
* Serializes a <code>filePublishInformation</code>.
private final Set<String> inclusions =
Sets.newHashSet("productName", "vendorName", "lastEpochMicrosec", "sourceName", "startEpochMicrosec",
"timeZoneOffset", "location", "compression", "fileFormatType", "fileFormatVersion");
+
@Override
- public boolean shouldSkipField(FieldAttributes f) {
- return !inclusions.contains(f.getName());
+ public boolean shouldSkipField(FieldAttributes fieldAttributes) {
+ return !inclusions.contains(fieldAttributes.getName());
}
@Override
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.StreamSupport;
+
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
private static final String FILE_FORMAT_VERSION = "fileFormatVersion";
private static final String FILE_READY_CHANGE_TYPE = "FileReady";
- private static final String FILE_READY_CHANGE_IDENTIFIER = "PM_MEAS_FILES";
/**
* The data types available in the event name.
* @return a <code>Flux</code> containing messages.
*/
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
- return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
+ return rawMessage.flatMapMany(JsonMessageParser::getJsonParserMessage).flatMap(this::createMessageData);
}
Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
: getMessagesFromJsonArray(jsonElement);
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- logger.trace("original message from message router: {}", message);
+ private static Mono<JsonElement> getJsonParserMessage(String message) {
return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
}
- private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+ private static Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
: logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Mono<FileReadyMessage> transformMessages(JsonObject message) {
+ private static Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
}
- private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
+ private static Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
List<String> missingValues = new ArrayList<>();
JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
.changeIdentifier(changeIdentifier) //
.changeType(changeType) //
.build();
- if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
+ if (missingValues.isEmpty() && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
String errorMessage = "VES event parsing.";
if (!missingValues.isEmpty()) {
errorMessage += " Missing data: " + missingValues;
}
- if (!isChangeIdentifierCorrect(changeIdentifier) || !isChangeTypeCorrect(changeType)) {
- errorMessage += " Change identifier or change type is wrong.";
+ if (!isChangeTypeCorrect(changeType)) {
+ errorMessage += " Change type is wrong: " + changeType + " expected: " + FILE_READY_CHANGE_TYPE;
}
errorMessage += " Message: {}";
logger.error(errorMessage, message);
}
}
- private boolean isChangeTypeCorrect(String changeType) {
+ private static boolean isChangeTypeCorrect(String changeType) {
return FILE_READY_CHANGE_TYPE.equals(changeType);
}
- private boolean isChangeIdentifierCorrect(String changeIdentifier) {
- return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
- }
-
- private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) {
+ private static List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields,
+ MessageMetaData messageMetaData) {
List<FileData> res = new ArrayList<>();
for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
return res;
}
- private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
+ private static Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) {
logger.trace("starting to getFileDataFromJson!");
List<String> missingValues = new ArrayList<>();
}
/**
- * Gets data from the event name. Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description},
- * example: Noti_RnNode-Ericsson_FileReady
+ * Gets data from the event name. Defined as:
+ * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
* @param eventName The event name to get the data from.
* @param missingValues List of missing values. The dataType will be added if missing.
* @return String of data from event name
*/
- private String getDataFromEventName(EventNameDataType dataType, String eventName, List<String> missingValues) {
+ private static String getDataFromEventName(EventNameDataType dataType, String eventName,
+ List<String> missingValues) {
String[] eventArray = eventName.split("_|-");
if (eventArray.length >= 4) {
return eventArray[dataType.index];
return "";
}
- private String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
+ private static String getValueFromJson(JsonObject jsonObject, String jsonKey, List<String> missingValues) {
if (jsonObject.has(jsonKey)) {
return jsonObject.get(jsonKey).getAsString();
} else {
}
}
- private boolean containsNotificationFields(JsonObject jsonObject) {
+ private static boolean containsNotificationFields(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
}
- private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
+ private static Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
logger.error(errorMessage);
return Flux.empty();
}
return publishedFiles.size();
}
- private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+ private static boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
final int timeToKeepInfoInSeconds = 60 * 60 * 24;
return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
}
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
/**
* Client used to send requests to DataRouter.
request.addHeader("Authorization", "Basic " + base64Creds);
}
- /**
- * Gets a <code>UriBuilder</code> containing the base URI needed talk to DataRouter. Specific parts can then be
- * added to the URI by the user.
- *
- * @return a <code>UriBuilder</code> containing the base URI needed talk to DataRouter.
- */
- public UriBuilder getBaseUri() {
- return new DefaultUriBuilderFactory().builder() //
- .scheme(configuration.dmaapProtocol()) //
- .host(configuration.dmaapHostName()) //
- .port(configuration.dmaapPortNumber());
- }
-
private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, Duration requestTimeout,
- Map<String, String> contextMap)
- throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+ Map<String, String> contextMap) throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
SSLContext sslContext =
new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.DmaapWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
private final JsonMessageParser jsonMessageParser;
private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
- public DMaaPMessageConsumer(AppConfig datafileAppConfig) {
+ public DMaaPMessageConsumer(AppConfig datafileAppConfig) throws DatafileTaskException {
this.jsonMessageParser = new JsonMessageParser();
this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig);
}
return jsonMessageParser.getMessagesFromJson(message);
}
- private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) {
- DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration();
+ private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig)
+ throws DatafileTaskException {
+ DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration().toDmaap();
WebClient client = new DmaapWebClient().fromConfiguration(config).build();
return new DMaaPConsumerReactiveHttpClient(config, client);
}
import java.io.IOException;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
public class DataRouterPublisher {
private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
private static final String CONTENT_TYPE = "application/octet-stream";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
Duration firstBackoff) {
MDC.setContextMap(publishInfo.getContext());
- dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(publishInfo) //
.cache() //
.flatMap(this::publishFile) //
MDC.setContextMap(publishInfo.getContext());
logger.trace("Entering publishFile with {}", publishInfo);
try {
+ DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier());
HttpPut put = new HttpPut();
prepareHead(publishInfo, put);
prepareBody(publishInfo, put);
- dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+ dmaapProducerHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
- dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
+ dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
}
}
- private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
+ private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
+
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getPublishUri(publishInfo.getName()));
+ URI uri = new DefaultUriBuilderFactory(
+ datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
+ .builder() //
+ .pathSegment(publishInfo.getName()) //
+ .build();
+ put.setURI(uri);
+
MappedDiagnosticContext.appendTraceInfo(put);
}
}
}
- private URI getPublishUri(String fileName) {
- return dmaapProducerReactiveHttpClient.getBaseUri() //
- .pathSegment(PUBLISH_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .pathSegment(fileName).build();
- }
-
- private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+ private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
return realResource.getInputStream();
}
- DmaapPublisherConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapPublisherConfiguration();
+ PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
+ return datafileAppConfig.getPublisherConfiguration(changeIdentifer);
}
- DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
+ try {
+ DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
+ return new DmaapProducerHttpClient(cfg);
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot resolve producer client", e);
+ }
+
}
}
return Mono.just(fileData) //
.cache() //
- .flatMap(fd -> collectFile(fileData, contextMap)) //
+ .flatMap(fd -> tryCollectFile(fileData, contextMap)) //
.retryBackoff(numRetries, firstBackoff) //
- .flatMap(this::checkCollectedFile);
+ .flatMap(FileCollector::checkCollectedFile);
}
- private Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
+ private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
if (info.isPresent()) {
return Mono.just(info.get());
} else {
}
}
- private Mono<Optional<FilePublishInformation>> collectFile(FileData fileData, Map<String, String> context) {
+ private Mono<Optional<FilePublishInformation>> tryCollectFile(FileData fileData, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("starting to collectFile {}", fileData.name());
}
} catch (Exception throwable) {
logger.warn("Failed to close ftp client: {} {}, reason: {}", fileData.sourceName(), fileData.name(),
- throwable.toString());
+ throwable.toString(), throwable);
return Mono.just(Optional.of(getFilePublishInformation(fileData, localFile, context)));
}
}
}
}
- private FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
+ private static FilePublishInformation getFilePublishInformation(FileData fileData, Path localFile,
Map<String, String> context) {
String location = fileData.location();
MessageMetaData metaData = fileData.messageMetaData();
.compression(fileData.compression()) //
.fileFormatType(fileData.fileFormatType()) //
.fileFormatVersion(fileData.fileFormatVersion()) //
+ .changeIdentifier(fileData.messageMetaData().changeIdentifier()) //
.context(context) //
.build();
}
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
*
*/
public class PublishedChecker {
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String DEFAULT_FEED_ID = "1";
- private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
+ private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private final AppConfig appConfig;
/**
*
* @param fileName the name of the file used when it is published.
*
- * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ * @return <code>true</code> if the file has been published before, <code>false</code>
+ * otherwise.
+ * @throws DatafileTaskException if the check fails
*/
- public boolean isFilePublished(String fileName, Map<String, String> contextMap) {
+ public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap)
+ throws DatafileTaskException {
MDC.setContextMap(contextMap);
- DmaapProducerHttpClient producerClient = resolveClient();
+ PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier);
+
+ DmaapProducerHttpClient producerClient = resolveClient(publisherConfig);
HttpGet getRequest = new HttpGet();
MappedDiagnosticContext.appendTraceInfo(getRequest);
- getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
- producerClient.addUserCredentialsToHead(getRequest);
-
try {
+ getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig));
+ producerClient.addUserCredentialsToHead(getRequest);
+
HttpResponse response = producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest,
WEB_CLIENT_TIMEOUT, contextMap);
}
}
- private URI getPublishedQueryUri(String fileName, DmaapProducerHttpClient producerClient) {
- return producerClient.getBaseUri() //
- .pathSegment(FEEDLOG_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .queryParam("type", "pub") //
- .queryParam("filename", fileName) //
+ private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException {
+ return new URIBuilder(config.logUrl()) //
+ .addParameter("type", "pub") //
+ .addParameter("filename", fileName) //
.build();
}
- protected DmaapPublisherConfiguration resolveConfiguration() {
- return appConfig.getDmaapPublisherConfiguration();
+ protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException {
+ return appConfig.getPublisherConfiguration(changeIdentifier);
}
- protected DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig)
+ throws DatafileTaskException {
+ try {
+ return new DmaapProducerHttpClient(publisherConfig.toDmaap());
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot create published checker client", e);
+ }
}
}
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
threadPoolQueueSize.get());
return;
}
+ if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) {
+ logger.warn("No configuration loaded, skipping polling for messages");
+ return;
+ }
currentNumberOfSubscriptions.incrementAndGet();
Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
logger.trace("Execution of tasks was registered");
- applicationConfiguration.loadConfigurationFromFile();
createMainTask(context) //
- .subscribe(this::onSuccess, //
+ .subscribe(ScheduledTasks::onSuccess, //
throwable -> {
onError(throwable, context);
currentNumberOfSubscriptions.decrementAndGet();
.flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> createMdcContext(fileData, context)) //
+ .filter(this::isFeedConfigured) //
.filter(this::shouldBePublished) //
.flatMap(this::fetchFile, false, 1, 1) //
.flatMap(this::publishToDataRouter, false, 1, 1) //
}
private class FileDataWithContext {
- FileDataWithContext(FileData fileData, Map<String, String> context) {
+ public final FileData fileData;
+ public final Map<String, String> context;
+
+ public FileDataWithContext(FileData fileData, Map<String, String> context) {
this.fileData = fileData;
this.context = context;
}
-
- final FileData fileData;
- final Map<String, String> context;
}
/**
return this.threadPoolQueueSize.get();
}
- protected DMaaPMessageConsumer createConsumerTask() {
+ protected DMaaPMessageConsumer createConsumerTask() throws DatafileTaskException {
return new DMaaPMessageConsumer(this.applicationConfiguration);
}
return new DataRouterPublisher(applicationConfiguration);
}
- private void onComplete(Map<String, String> contextMap) {
+ private static void onComplete(Map<String, String> contextMap) {
MDC.setContextMap(contextMap);
logger.trace("Datafile tasks have been completed");
}
- private synchronized void onSuccess(FilePublishInformation publishInfo) {
+ private static synchronized void onSuccess(FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
logger.info("Datafile file published {}", publishInfo.getInternalLocation());
}
- private void onError(Throwable throwable, Map<String, String> context) {
+ private static void onError(Throwable throwable, Map<String, String> context) {
MDC.setContextMap(context);
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString());
}
return Mono.just(pair);
}
+ private boolean isFeedConfigured(FileDataWithContext fileData) {
+ if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) {
+ return true;
+ } else {
+ logger.info("No feed is configured for: {}, file ignored: {}",
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
+ return false;
+ }
+ }
+
private boolean shouldBePublished(FileDataWithContext fileData) {
boolean result = false;
Path localFilePath = fileData.fileData.getLocalFilePath();
if (publishedFilesCache.put(localFilePath) == null) {
- result = !createPublishedChecker().isFilePublished(fileData.fileData.name(), fileData.context);
+ try {
+ result = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
+ fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
+ } catch (DatafileTaskException e) {
+ logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
+ }
}
if (!result) {
currentNumberOfTasks.decrementAndGet();
*/
private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
logger.info(
- "Consuming new file ready messages, current number of tasks: {}, published files: {}, number of subscrptions: {}",
+ "Consuming new file ready messages, current number of tasks: {}, published files: {}, "
+ + "number of subscriptions: {}",
getCurrentNumberOfTasks(), publishedFilesCache.size(), this.currentNumberOfSubscriptions.get());
Map<String, String> context = MDC.getCopyOfContextMap();
- return createConsumerTask() //
- .getMessageRouterResponse() //
- .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ try {
+ return createConsumerTask() //
+ .getMessageRouterResponse() //
+ .onErrorResume(exception -> handleConsumeMessageFailure(exception, context));
+ } catch (Exception e) {
+ logger.error("Could not create message consumer task", e);
+ return Flux.empty();
+ }
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) {
return Flux.empty();
}
- private void deleteFile(Path localFile, Map<String, String> context) {
+ private static void deleteFile(Path localFile, Map<String, String> context) {
MDC.setContextMap(context);
logger.trace("Deleting file: {}", localFile);
try {
+++ /dev/null
-{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "",
- "dmaapUserPassword": "",
- "dmaapContentType": "application/json",
- "consumerId": "C12",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMs": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3907,
- "dmaapTopicName": "publish",
- "dmaapProtocol": "https",
- "dmaapUserName": "dradmin",
- "dmaapUserPassword": "dradmin",
- "dmaapContentType": "application/octet-stream"
- }
- },
- "ftp": {
- "ftpesConfiguration": {
- "keyCert": "config/dfc.jks",
- "keyPassword": "secret",
- "trustedCa": "config/ftp.jks",
- "trustedCaPassword": "secret"
- }
- }
- }
-}
-
package org.onap.dcaegen2.collectors.datafile.configuration;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
import com.google.gson.JsonElement;
+import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.Objects;
+import java.util.Map;
+import java.util.Properties;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
/**
* Tests the AppConfig.
*/
class AppConfigTest {
- private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json";
- private static final boolean CORRECT_JSON = true;
- private static final boolean INCORRECT_JSON = false;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+
+
+ private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ new ImmutableDmaapConsumerConfiguration.Builder() //
+ .timeoutMs(-1) //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("admin") //
+ .dmaapUserPassword("admin") //
+ .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .dmaapPortNumber(2222) //
+ .dmaapContentType("application/json") //
+ .messageLimit(-1) //
+ .dmaapProtocol("http") //
+ .consumerId("C12") //
+ .consumerGroup("OpenDcae-c12") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
+ .topicUrl(
+ "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
+ ImmutablePublisherConfiguration.builder() //
+ .publishUrl("https://message-router.onap.svc.cluster.local:3907/publish/1") //
+ .logUrl("https://dmaap.example.com/feedlog/972").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .changeIdentifier("PM_MEAS_FILES") //
+ .userName("user") //
+ .passWord("password") //
+ .build();
+
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ new ImmutableFtpesConfig.Builder() //
+ .keyCert("/config/dfc.jks") //
+ .keyPassword("secret") //
+ .trustedCa("config/ftp.jks") //
+ .trustedCaPassword("secret") //
+ .build();
+
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapTopicName("/publish/1") //
+ .dmaapUserPassword("password") //
+ .dmaapPortNumber(3907) //
+ .dmaapProtocol("https") //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("user") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static EnvProperties properties() {
+ return ImmutableEnvProperties.builder() //
+ .consulHost("host") //
+ .consulPort(123) //
+ .cbsName("cbsName") //
+ .appName("appName") //
+ .build();
+ }
- private static AppConfig appConfigUnderTest;
+ private AppConfig appConfigUnderTest;
+ private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class);
+ private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
- private static String filePath =
- Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
@BeforeEach
public void setUp() {
appConfigUnderTest = spy(AppConfig.class);
+ appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider);
+ appConfigUnderTest.systemEnvironment = new Properties();
}
@Test
- public void whenApplicationWasStarted_FilePathIsSet() {
+ public void whenTheConfigurationFits() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
+ doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
+ appConfigUnderTest.initialize();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(0)).loadConfigurationFromFile();
- Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath());
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
+
+ ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
+ Assertions.assertNotNull(consumerCfg);
+ assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
+ assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG);
+
+ PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
+ Assertions.assertNotNull(publisherCfg);
+ assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
+ assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
+
+ FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
+ assertThat(ftpesConfig).isNotNull();
+ assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
}
@Test
- public void whenTheConfigurationFits_GetFtpsAndDmaapObjectRepresentationConfiguration() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
-
+ public void whenTheConfigurationFits_twoProducers() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getCorrectJsonTwoProducers()).when(appConfigUnderTest).createInputStream(any());
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(),
- appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(),
- appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration());
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("XX_FILES"));
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("YY_FILES"));
+
+ assertThat(appConfigUnderTest.getPublisherConfiguration("XX_FILES").publishUrl())
+ .isEqualTo("feed01::publish_url");
+ assertThat(appConfigUnderTest.getPublisherConfiguration("YY_FILES").publishUrl())
+ .isEqualTo("feed01::publish_url");
}
@Test
- public void whenFileIsNotExist_ThrowIoException() {
+ public void whenFileIsNotExist_ThrowException() throws DatafileTaskException {
// Given
- filePath = "/temp.json";
- appConfigUnderTest.setFilepath(filePath);
+ appConfigUnderTest.setFilepath("/temp.json");
// When
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES");
+ Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
@Test
- public void whenFileIsExistsButJsonIsIncorrect() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(INCORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
+ public void whenFileIsExistsButJsonIsIncorrect() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any());
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining(CHANGE_IDENTIFIER);
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
-
}
-
@Test
- public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
+ public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException, DatafileTaskException {
+
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining(CHANGE_IDENTIFIER);
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
- private String getJsonConfig(boolean correct) {
- JsonObject dmaapConsumerConfigData = new JsonObject();
- dmaapConsumerConfigData.addProperty("dmaapHostName", "localhost");
- dmaapConsumerConfigData.addProperty("dmaapPortNumber", 2222);
- dmaapConsumerConfigData.addProperty("dmaapTopicName", "/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
- dmaapConsumerConfigData.addProperty("dmaapProtocol", "http");
- dmaapConsumerConfigData.addProperty("dmaapUserName", "admin");
- dmaapConsumerConfigData.addProperty("dmaapUserPassword", "admin");
- dmaapConsumerConfigData.addProperty("dmaapContentType", "application/json");
- dmaapConsumerConfigData.addProperty("consumerId", "C12");
- dmaapConsumerConfigData.addProperty("consumerGroup", "OpenDcae-c12");
- dmaapConsumerConfigData.addProperty("timeoutMs", -1);
- dmaapConsumerConfigData.addProperty("messageLimit", 1);
-
- JsonObject dmaapProducerConfigData = new JsonObject();
- dmaapProducerConfigData.addProperty("dmaapHostName", "localhost");
- dmaapProducerConfigData.addProperty("dmaapPortNumber", 3907);
- dmaapProducerConfigData.addProperty("dmaapTopicName", "publish");
- dmaapProducerConfigData.addProperty("dmaapProtocol", "https");
- if (correct) {
- dmaapProducerConfigData.addProperty("dmaapUserName", "dradmin");
- dmaapProducerConfigData.addProperty("dmaapUserPassword", "dradmin");
- dmaapProducerConfigData.addProperty("dmaapContentType", "application/octet-stream");
- }
-
- JsonObject dmaapConfigs = new JsonObject();
- dmaapConfigs.add("dmaapConsumerConfiguration", dmaapConsumerConfigData);
- dmaapConfigs.add("dmaapProducerConfiguration", dmaapProducerConfigData);
-
- JsonObject ftpesConfigData = new JsonObject();
- ftpesConfigData.addProperty("keyCert", "config/dfc.jks");
- ftpesConfigData.addProperty("keyPassword", "secret");
- ftpesConfigData.addProperty("trustedCa", "config/ftp.jks");
- ftpesConfigData.addProperty("trustedCaPassword", "secret");
-
- JsonObject security = new JsonObject();
- security.addProperty("trustStorePath", "trustStorePath");
- security.addProperty("trustStorePasswordPath", "trustStorePasswordPath");
- security.addProperty("keyStorePath", "keyStorePath");
- security.addProperty("keyStorePasswordPath", "keyStorePasswordPath");
- security.addProperty("enableDmaapCertAuth", "enableDmaapCertAuth");
-
- JsonObject ftpesConfiguration = new JsonObject();
- ftpesConfiguration.add("ftpesConfiguration", ftpesConfigData);
-
- JsonObject configs = new JsonObject();
- configs.add("dmaap", dmaapConfigs);
- configs.add("ftp", ftpesConfiguration);
- configs.add("security", security);
-
- JsonObject completeJson = new JsonObject();
- completeJson.add("configs", configs);
-
- return completeJson.toString();
+ @Test
+ public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshNoConsul() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+ Mono<JsonObject> err = Mono.error(new IOException());
+ doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString()
+ .contains("Could not refresh application configuration java.io.IOException"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+
+ Mono<JsonObject> json = Mono.just(getJsonRootObject());
+
+ doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+
+ Mono<JsonObject> json = Mono.just(getJsonRootObject());
+ Mono<JsonObject> err = Mono.error(new IOException()); // no config entry created by the
+ // dmaap plugin
+
+ doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ }
+
+ private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
+ JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+ return rootObject;
+ }
+
+ private static InputStream getCorrectJson() throws IOException {
+ URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static InputStream getCorrectJsonTwoProducers() throws IOException {
+ URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test_2producers.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static InputStream getIncorrectJson() {
+ String string = "{" + //
+ " \"configs\": {" + //
+ " \"dmaap\": {"; //
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
}
}
+++ /dev/null
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import com.google.gson.JsonObject;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
-
-class CloudConfigParserTest {
- private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
- new ImmutableDmaapConsumerConfiguration.Builder() //
- .timeoutMs(-1) //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("admin") //
- .dmaapUserPassword("admin") //
- .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .dmaapPortNumber(2222) //
- .dmaapContentType("application/json") //
- .messageLimit(-1) //
- .dmaapProtocol("http") //
- .consumerId("C12") //
- .consumerGroup("OpenDCAE-c12") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
- new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapTopicName("publish") //
- .dmaapUserPassword("dradmin") //
- .dmaapPortNumber(3907) //
- .dmaapProtocol("https") //
- .dmaapContentType("application/json") //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("dradmin") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
- new ImmutableFtpesConfig.Builder() //
- .keyCert("/config/dfc.jks") //
- .keyPassword("secret") //
- .trustedCa("config/ftp.jks") //
- .trustedCaPassword("secret") //
- .build();
-
- private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
-
- @Test
- public void shouldCreateDmaapConsumerConfigurationCorrectly() {
- DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig();
-
- assertThat(dmaapConsumerConfig).isNotNull();
- assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
- }
-
- @Test
- public void shouldCreateDmaapPublisherConfigurationCorrectly() {
- DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig();
-
- assertThat(dmaapPublisherConfig).isNotNull();
- assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
- }
-
- @Test
- public void shouldCreateFtpesConfigurationCorrectly() {
- FtpesConfig ftpesConfig = cloudConfigParser.getFtpesConfig();
-
- assertThat(ftpesConfig).isNotNull();
- assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
- }
-
- public JsonObject getCloudConfigJsonObject() {
- JsonObject config = new JsonObject();
- config.addProperty("dmaap.dmaapConsumerConfiguration.timeoutMs", -1);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserName", "admin");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserPassword", "admin");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapTopicName",
- "/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapPortNumber", 2222);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapContentType", "application/json");
- config.addProperty("dmaap.dmaapConsumerConfiguration.messageLimit", -1);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapProtocol", "http");
- config.addProperty("dmaap.dmaapConsumerConfiguration.consumerId", "C12");
- config.addProperty("dmaap.dmaapConsumerConfiguration.consumerGroup", "OpenDCAE-c12");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapTopicName", "publish");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapProtocol", "https");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapContentType", "application/json");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapPortNumber", 3907);
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserName", "dradmin");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserPassword", "dradmin");
- config.addProperty("dmaap.ftpesConfig.keyCert", "/config/dfc.jks");
- config.addProperty("dmaap.ftpesConfig.keyPassword", "secret");
- config.addProperty("dmaap.ftpesConfig.trustedCa", "config/ftp.jks");
- config.addProperty("dmaap.ftpesConfig.trustedCaPassword", "secret");
-
- config.addProperty("dmaap.security.trustStorePath", "trustStorePath");
- config.addProperty("dmaap.security.trustStorePasswordPath", "trustStorePasswordPath");
- config.addProperty("dmaap.security.keyStorePath", "keyStorePath");
- config.addProperty("dmaap.security.keyStorePasswordPath", "keyStorePasswordPath");
- config.addProperty("dmaap.security.enableDmaapCertAuth", "true");
-
- return config;
- }
-}
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
+
import reactor.test.StepVerifier;
public class SchedulerConfigTest {
+ private final AppConfig appConfigurationMock = mock(AppConfig.class);
+ private final TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
+ private final ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
+ private final SchedulerConfig schedulerUnderTest =
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock));
+
+ @BeforeEach
+ public void setUp() {
+ doNothing().when(appConfigurationMock).stop();
+ doNothing().when(appConfigurationMock).initialize();
+ }
+
@Test
public void getResponseFromCancellationOfTasks_success() {
+
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
scheduledFutureList.add(scheduledFutureMock);
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
- SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
-
String msg = "Datafile Service has already been stopped!";
StepVerifier.create(schedulerUnderTest.getResponseFromCancellationOfTasks())
.expectNext(new ResponseEntity<String>(msg, HttpStatus.CREATED)) //
@Test
public void tryToStartTaskWhenNotStarted_success() {
- TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
- ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
- CloudConfiguration cloudConfigurationMock = mock(CloudConfiguration.class);
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
SchedulerConfig schedulerUnderTestSpy =
- spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, cloudConfigurationMock));
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock));
boolean actualResult = schedulerUnderTestSpy.tryToStartTask();
assertTrue(actualResult);
- ArgumentCaptor<Runnable> runTaskRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(taskSchedulerMock).scheduleAtFixedRate(runTaskRunnableCaptor.capture(), any(Instant.class),
- eq(Duration.ofMinutes(5)));
-
ArgumentCaptor<Runnable> scheduleMainDatafileEventTaskCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(taskSchedulerMock).scheduleWithFixedDelay(scheduleMainDatafileEventTaskCaptor.capture(),
eq(Duration.ofSeconds(15)));
verify(scheduledTasksMock).executeDatafileMainTask();
verifyNoMoreInteractions(scheduledTasksMock);
- runTaskRunnableCaptor.getValue().run();
- verify(cloudConfigurationMock).runTask();
- verifyNoMoreInteractions(cloudConfigurationMock);
+ verify(appConfigurationMock).initialize();
+ verifyNoMoreInteractions(appConfigurationMock);
- assertEquals(3, scheduledFutureList.size());
+ assertEquals(2, scheduledFutureList.size());
}
@Test
public void tryToStartTaskWhenAlreadyStarted_shouldReturnFalse() {
+ doNothing().when(appConfigurationMock).loadConfigurationFromFile();
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
scheduledFutureList.add(scheduledFutureMock);
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
- SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
+ SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, appConfigurationMock);
boolean actualResult = schedulerUnderTest.tryToStartTask();
+++ /dev/null
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.integration;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
-
-/**
- * Integration test for the ScheduledXmlContext.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-
-@Configuration
-@ComponentScan
-@ExtendWith({ SpringExtension.class })
-@ContextConfiguration(locations = { "classpath:scheduled-context.xml" })
-class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
-
- private static final int WAIT_FOR_SCHEDULING = 1;
-
- @Autowired
- private ScheduledTasks scheduledTask;
-
- @Test
- void testScheduling() {
- final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
- }
-
- private void verifyDmaapConsumerTask() {
- verify(scheduledTask, atLeast(1)).executeDatafileMainTask();
- }
-}
+++ /dev/null
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class FilePublishInformationTest {
- private static final String PRODUCT_NAME = "NrRadio";
- private static final String VENDOR_NAME = "Ericsson";
- private static final String LAST_EPOCH_MICROSEC = "8745745764578";
- private static final String SOURCE_NAME = "oteNB5309";
- private static final String START_EPOCH_MICROSEC = "8745745764578";
- private static final String TIME_ZONE_OFFSET = "UTC+05:00";
- private static final String NAME = "A20161224.1030-1045.bin.gz";
- private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
- private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
- private static final String COMPRESSION = "gzip";
- private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
- private static final String FILE_FORMAT_VERSION = "V10";
-
- @Test
- public void filePublishInformationBuilder_shouldBuildAnObject() {
- FilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
- .productName(PRODUCT_NAME) //
- .vendorName(VENDOR_NAME) //
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
- .sourceName(SOURCE_NAME) //
- .startEpochMicrosec(START_EPOCH_MICROSEC) //
- .timeZoneOffset(TIME_ZONE_OFFSET) //
- .name(NAME) //
- .location(LOCATION) //
- .internalLocation(INTERNAL_LOCATION) //
- .compression(COMPRESSION) //
- .fileFormatType(FILE_FORMAT_TYPE) //
- .fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>()) //
- .build();
-
- Assertions.assertNotNull(filePublishInformation);
- Assertions.assertEquals(PRODUCT_NAME, filePublishInformation.getProductName());
- Assertions.assertEquals(VENDOR_NAME, filePublishInformation.getVendorName());
- Assertions.assertEquals(LAST_EPOCH_MICROSEC, filePublishInformation.getLastEpochMicrosec());
- Assertions.assertEquals(SOURCE_NAME, filePublishInformation.getSourceName());
- Assertions.assertEquals(START_EPOCH_MICROSEC, filePublishInformation.getStartEpochMicrosec());
- Assertions.assertEquals(TIME_ZONE_OFFSET, filePublishInformation.getTimeZoneOffset());
- Assertions.assertEquals(NAME, filePublishInformation.getName());
- Assertions.assertEquals(LOCATION, filePublishInformation.getLocation());
- Assertions.assertEquals(INTERNAL_LOCATION, filePublishInformation.getInternalLocation());
- Assertions.assertEquals(COMPRESSION, filePublishInformation.getCompression());
- Assertions.assertEquals(FILE_FORMAT_TYPE, filePublishInformation.getFileFormatType());
- Assertions.assertEquals(FILE_FORMAT_VERSION, filePublishInformation.getFileFormatVersion());
- }
-}
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
+
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileReadyMessage() {
+ void whenPassingCorrectJson_oneFileReadyMessage() throws URISyntaxException {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.net.URI;
+
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
Header[] authorizationHeaders = request.getHeaders("Authorization");
assertEquals(base64Creds, authorizationHeaders[0].getValue());
}
-
- @Test
- public void getBaseUri_success() {
- URI uri = producerClientUnderTestSpy.getBaseUri().build();
- assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
- }
}
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>();
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private DMaaPConsumerReactiveHttpClient httpClientMock;
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.context(new HashMap<String,String>()) //
.build();
listOfFilePublishInformation.add(filePublishInformation);
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
import reactor.test.StepVerifier;
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DataRouterPublisherTest {
+
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static final String COMPRESSION = "gzip";
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static FilePublishInformation filePublishInformation;
private static DmaapProducerHttpClient httpClientMock;
private static AppConfig appConfig;
- private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static Map<String, String> context = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
+ // "https://54.45.333.2:1234/publish/1";
+ private static final String PUBLISH_URL =
+ HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
+
@BeforeAll
public static void setUp() {
- when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
filePublishInformation = ImmutableFilePublishInformation.builder() //
.productName(PRODUCT_NAME) //
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
.context(context) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
appConfig = mock(AppConfig.class);
publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
.verifyComplete();
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
- verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
verifyNoMoreInteractions(httpClientMock);
assertEquals(HTTPS_SCHEME, actualUri.getScheme());
assertEquals(HOST, actualUri.getHost());
assertEquals(PORT, actualUri.getPort());
+
Path actualPath = Paths.get(actualUri.getPath());
assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
- // Note that the following line checks the number of properties that are sent to the data router.
+ // Note that the following line checks the number of properties that are sent to the data
+ // router.
// This should be 10 unless the API is updated (which is the fields checked above)
assertEquals(10, metaHash.size());
}
.expectNext(filePublishInformation) //
.verifyComplete();
- verify(httpClientMock, times(2)).getBaseUri();
verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
verifyNoMoreInteractions(httpClientMock);
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
- verify(httpClientMock, times(2)).getBaseUri();
verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
verifyNoMoreInteractions(httpClientMock);
final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
throws Exception {
httpClientMock = mock(DmaapProducerHttpClient.class);
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
- doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
- doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
-
- UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
- when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+ when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
+ doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
+ doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER);
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
private static final String TRUSTED_CA_PATH = "trustedCAPath";
private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static AppConfig appConfigMock = mock(AppConfig.class);
private static FtpesConfig ftpesConfigMock = mock(FtpesConfig.class);
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>())
+ .context(new HashMap<String,String>()) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build();
}
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
public class PublishedCheckerTest {
+ private static final String PUBLISH_URL = "https://54.45.33.2:1234/";
private static final String EMPTY_CONTENT = "[]";
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String FEED_ID = "1";
- private static final String HTTPS_SCHEME = "https";
- private static final String HOST = "54.45.33.2";
- private static final int PORT = 1234;
private static final String SOURCE_NAME = "oteNB5309";
private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String LOG_URI = "https://localhost:3907/feedlog/1";
private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
- private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static AppConfig appConfigMock;
private DmaapProducerHttpClient httpClientMock = mock(DmaapProducerHttpClient.class);
private PublishedChecker publishedCheckerUnderTestSpy;
- /**
- * Sets up data for the tests.
- */
+
@BeforeAll
- public static void setUp() {
- when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ public static void setUp() throws DatafileTaskException {
+ when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
appConfigMock = mock(AppConfig.class);
- when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ when(appConfigMock.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
}
@Test
public void executeWhenNotPublished_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
- verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), any(), any());
verifyNoMoreInteractions(httpClientMock);
HttpUriRequest getRequest = requestCaptor.getValue();
assertTrue(getRequest instanceof HttpGet);
URI actualUri = getRequest.getURI();
- assertEquals(HTTPS_SCHEME, actualUri.getScheme());
- assertEquals(HOST, actualUri.getHost());
- assertEquals(PORT, actualUri.getPort());
- Path actualPath = Paths.get(actualUri.getPath());
- assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
- assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
- String actualQuery = actualUri.getQuery();
- assertTrue(actualQuery.contains("type=pub"));
- assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+ // https://localhost:3907/feedlog/1?type=pub&filename=oteNB5309_A20161224.1030-1045.bin.gz
+ String expUri = LOG_URI + "?type=pub&filename=" + LOCAL_FILE_NAME;
+ assertEquals(expUri, actualUri.toString());
}
@Test
public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
}
public void executeWhenPublished_returnsTrue() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertTrue(isPublished);
}
public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
}
final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
- doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
- doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
-
- UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
- when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+ doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
+ doReturn(LOG_URI).when(publisherConfigurationMock).logUrl();
+ doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(publisherConfigurationMock);
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ScheduledTasksTest {
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private AppConfig appConfig = mock(AppConfig.class);
private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
private DataRouterPublisher dataRouterMock;
private Map<String, String> contextMap = new HashMap<String, String>();
+ private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
+
@BeforeEach
- private void setUp() {
- DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234) //
- .dmaapProtocol("https") //
- .dmaapUserName("DFC") //
- .dmaapUserPassword("DFC") //
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ private void setUp() throws DatafileTaskException {
+ final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(publishUrl) //
+ .logUrl("") //
+ .userName("userName") //
+ .passWord("passWord") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
+ .topicUrl("topicUrl").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
+ doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
+ doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
consumerMock = mock(DMaaPMessageConsumer.class);
publishedCheckerMock = mock(PublishedChecker.class);
.sourceName("") //
.startEpochMicrosec("") //
.timeZoneOffset("") //
- .changeIdentifier("") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.changeType("") //
.build();
}
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.context(new HashMap<String, String>()).build();
}
@Test
- public void notingToConsume() {
+ public void notingToConsume() throws DatafileTaskException {
doReturn(consumerMock).when(testedObject).createConsumerTask();
doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
}
@Test
- public void consume_successfulCase() {
+ public void consume_successfulCase() throws DatafileTaskException {
final int noOfEvents = 200;
final int noOfFilesPerEvent = 200;
final int noOfFiles = noOfEvents * noOfFilesPerEvent;
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
}
@Test
- public void consume_fetchFailedOnce() {
+ public void consume_fetchFailedOnce() throws DatafileTaskException {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
Mono<Object> error = Mono.error(new Exception("problem"));
}
@Test
- public void consume_publishFailedOnce() {
+ public void consume_publishFailedOnce() throws DatafileTaskException {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
}
@Test
- public void consume_successfulCase_sameFileNames() {
+ public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
final int noOfEvents = 1;
final int noOfFilesPerEvent = 100;
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
- verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
+ verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);
+++ /dev/null
-{
- "configs": {
- "dmaap": {
- "dmaapConsumerConfiguration": {
- "consumerId": "C12",
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT",
- "dmaapProtocol": "http",
- "dmaapUserName": "admin",
- "dmaapUserPassword": "admin",
- "dmaapContentType": "application/json",
- "consumerGroup": "OpenDcae-c12",
- "timeoutMs": -1,
- "messageLimit": 1
- },
- "dmaapProducerConfiguration": {
- "dmaapHostName": "localhost",
- "dmaapPortNumber": 3907,
- "dmaapProtocol": "https",
- "dmaapTopicName": "publish",
- "dmaapUserName": "dradmin",
- "dmaapUserPassword": "dradmin",
- "dmaapContentType": "application/octet-stream"
- }
- },
- "ftp": {
- "ftpesConfiguration": {
- "keyCert": "/config/dfc.jks",
- "keyPassword": "secret",
- "trustedCa": "/config/ftp.jks",
- "trustedCaPassword": "secret"
- }
- },
- "security": {
- "trustStorePath" : "trustStorePath",
- "trustStorePasswordPath" : "trustStorePasswordPath",
- "keyStorePath" : "keyStorePath",
- "keyStorePasswordPath" : "keyStorePasswordPath",
- "enableDmaapCertAuth" : "enableDmaapCertAuth"
- }
- }
-}
-
--- /dev/null
+{
+ "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword":"secret",
+ "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword":"secret",
+ "dmaap.security.trustStorePath":"trustStorePath",
+ "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
+ "dmaap.security.keyStorePath":"keyStorePath",
+ "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
+ "dmaap.security.enableDmaapCertAuth":"true",
+ "dmaap.dmaapProducerConfiguration": {
+ "changeIdentifier":"PM_MEAS_FILES",
+ "feedName":"feed00"
+ },
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "dmmap_info":{
+ "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type":"message_router"
+ }
+ },
+ "feed00":{
+ "username":"user",
+ "log_url":"https://dmaap.example.com/feedlog/972",
+ "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
+ "location":"loc00",
+ "password":"password",
+ "publisher_id":"972.360gm"
+ }
+}
--- /dev/null
+{
+ "dmaap.ftpesConfig.keyCert":"/config/dfc.jks",
+ "dmaap.ftpesConfig.keyPassword":"secret",
+ "dmaap.ftpesConfig.trustedCa":"config/ftp.jks",
+ "dmaap.ftpesConfig.trustedCaPassword":"secret",
+ "dmaap.security.trustStorePath":"trustStorePath",
+ "dmaap.security.trustStorePasswordPath":"trustStorePasswordPath",
+ "dmaap.security.keyStorePath":"keyStorePath",
+ "dmaap.security.keyStorePasswordPath":"keyStorePasswordPath",
+ "dmaap.security.enableDmaapCertAuth":"true",
+ "dmaap.dmaapProducerConfiguration":[
+ {
+ "changeIdentifier":"PM_MEAS_FILES",
+ "feedName":"feed00"
+ },
+ {
+ "changeIdentifier":"XX_FILES",
+ "feedName":"feed01"
+ },
+ {
+ "changeIdentifier":"YY_FILES",
+ "feedName":"feed01"
+ }
+ ],
+ "streams_subscribes":{
+ "dmaap_subscriber":{
+ "dmmap_info":{
+ "topic_url":"http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12"
+ },
+ "type":"message_router"
+ }
+ },
+ "feed00":{
+ "username":"user",
+ "log_url":"https://dmaap.example.com/feedlog/972",
+ "publish_url":"https://message-router.onap.svc.cluster.local:3907/publish/1",
+ "location":"loc00",
+ "password":"password",
+ "publisher_id":"972.360gm"
+ },
+ "feed01":{
+ "username":"user",
+ "log_url":"feed01::log_url",
+ "publish_url":"feed01::publish_url",
+ "location":"loc00",
+ "password":"",
+ "publisher_id":""
+ }
+}