2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.dcaegen2.collectors.datafile.configuration;
21 import com.google.gson.JsonElement;
22 import com.google.gson.JsonObject;
24 import java.nio.file.Path;
25 import java.nio.file.Paths;
26 import java.time.Duration;
27 import java.util.HashMap;
28 import java.util.Iterator;
30 import java.util.Properties;
32 import javax.validation.constraints.NotNull;
34 import io.vavr.collection.Stream;
35 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
36 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
37 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
46 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
47 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
48 import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
49 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
52 * Parses the cloud configuration.
54 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
55 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
57 public class CloudConfigParser {
59 private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
60 private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
61 private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
62 private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
63 private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
64 private static final String CONFIG = "config";
66 private static final String KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY = "KNOWN_HOSTS_FILE_PATH";
67 private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
68 "sftp.security.strictHostKeyChecking";
70 private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
71 private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
72 private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
73 private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
74 private static final int FIRST_SOURCE_INDEX = 0;
76 private final Properties systemEnvironment;
78 private final JsonObject jsonObject;
80 public CloudConfigParser(JsonObject jsonObject, Properties systemEnvironment) {
81 this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
82 this.systemEnvironment = systemEnvironment;
86 * Get the publisher configurations.
88 * @return a map with change identifier as key and the connected publisher configuration as value.
89 * @throws DatafileTaskException if a member of the configuration is missing.
91 public @NotNull Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
92 JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject();
93 Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator();
94 Map<String, PublisherConfiguration> result = new HashMap<>();
96 while (changeIdentifierList.hasNext()) {
97 String changeIdentifier = changeIdentifierList.next();
98 JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier);
99 JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject();
101 PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
102 .publishUrl(getAsString(feedConfig, "publish_url")) //
103 .password(getAsString(feedConfig, "password")) //
104 .userName(getAsString(feedConfig, "username")) //
105 .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
106 .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
107 .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) //
108 .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
109 .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
110 .changeIdentifier(changeIdentifier) //
111 .logUrl(getAsString(feedConfig, "log_url")) //
114 result.put(cfg.changeIdentifier(), cfg);
120 * Get the consumer configuration.
122 * @return the consumer configuration.
123 * @throws DatafileTaskException if the configuration is invalid.
125 public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException {
127 MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
128 MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
129 MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
130 return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
131 } catch (Exception e) {
132 throw new DatafileTaskException("Could not parse message router consumer configuration", e);
136 private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
137 return ImmutableMessageRouterSubscriberConfig.builder()
138 .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
142 private SecurityKeys createSecurityKeys() throws DatafileTaskException {
143 return ImmutableSecurityKeys.builder()
144 .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)))
145 .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)))
146 .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)))
147 .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)))
151 private boolean isDmaapCertAuthEnabled(JsonObject config) {
152 return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean();
155 private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException {
156 Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject);
157 if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) {
158 throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources);
160 RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX);
161 MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
163 return ImmutableMessageRouterSubscribeRequest.builder()
164 .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
165 .sourceDefinition(parsedSource)
166 .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
167 .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
172 * Get the sFTP configuration.
174 * @return the sFTP configuration.
175 * @throws DatafileTaskException if a member of the configuration is missing.
177 public @NotNull SftpConfig getSftpConfig() throws DatafileTaskException {
178 String filePath = determineKnownHostsFilePath();
179 return new ImmutableSftpConfig.Builder() //
180 .strictHostKeyChecking(getAsBoolean(jsonObject, CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING))
181 .knownHostsFilePath(filePath).build();
185 * Get the security configuration for communication with the xNF.
187 * @return the xNF communication security configuration.
188 * @throws DatafileTaskException if a member of the configuration is missing.
190 public @NotNull FtpesConfig getFtpesConfig() throws DatafileTaskException {
191 return new ImmutableFtpesConfig.Builder() //
192 .keyCert(getAsString(jsonObject, "dmaap.ftpesConfig.keyCert"))
193 .keyPasswordPath(getAsString(jsonObject, "dmaap.ftpesConfig.keyPasswordPath"))
194 .trustedCa(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCa"))
195 .trustedCaPasswordPath(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCaPasswordPath")) //
199 private String determineKnownHostsFilePath() {
200 String filePath = "";
201 if (systemEnvironment != null) {
203 systemEnvironment.getProperty(KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY, "/home/datafile/.ssh/known_hosts");
208 private static @NotNull JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
209 JsonElement elem = obj.get(memberName);
211 throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
216 private static @NotNull String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
217 return get(obj, memberName).getAsString();
220 private static @NotNull Boolean getAsBoolean(JsonObject obj, String memberName) throws DatafileTaskException {
221 return get(obj, memberName).getAsBoolean();
224 private static @NotNull JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
225 return get(obj, memberName).getAsJsonObject();
228 private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
229 return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));