2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2018, 2020-2021 NOKIA Intellectual Property, 2018-2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.onap.dcaegen2.collectors.datafile.configuration;
22 import com.google.gson.JsonElement;
23 import com.google.gson.JsonObject;
25 import java.nio.file.Path;
26 import java.nio.file.Paths;
27 import java.time.Duration;
28 import java.util.HashMap;
29 import java.util.Iterator;
31 import java.util.Properties;
33 import javax.validation.constraints.NotNull;
35 import io.vavr.collection.Stream;
36 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
37 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
38 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
46 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
47 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
48 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
49 import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
50 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
53 * Parses the cloud configuration.
55 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
56 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
58 public class CloudConfigParser {
60 private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
61 private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
62 private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
63 private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
64 private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
65 private static final String CONFIG = "config";
67 private static final String KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY = "KNOWN_HOSTS_FILE_PATH";
68 private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
69 "sftp.security.strictHostKeyChecking";
71 private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
72 private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
73 private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
74 private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
75 private static final int FIRST_SOURCE_INDEX = 0;
77 private final Properties systemEnvironment;
79 private final JsonObject jsonObject;
81 public CloudConfigParser(JsonObject jsonObject, Properties systemEnvironment) {
82 this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
83 this.systemEnvironment = systemEnvironment;
87 * Get the publisher configurations.
89 * @return a map with change identifier as key and the connected publisher configuration as value.
90 * @throws DatafileTaskException if a member of the configuration is missing.
92 public @NotNull Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
93 JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject();
94 Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator();
95 Map<String, PublisherConfiguration> result = new HashMap<>();
97 while (changeIdentifierList.hasNext()) {
98 String changeIdentifier = changeIdentifierList.next();
99 JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier);
100 JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject();
102 PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
103 .publishUrl(getAsString(feedConfig, "publish_url")) //
104 .password(getAsString(feedConfig, "password")) //
105 .userName(getAsString(feedConfig, "username")) //
106 .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
107 .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
108 .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) //
109 .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
110 .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
111 .changeIdentifier(changeIdentifier) //
112 .logUrl(getAsString(feedConfig, "log_url")) //
115 result.put(cfg.changeIdentifier(), cfg);
121 * Get the consumer configuration.
123 * @return the consumer configuration.
124 * @throws DatafileTaskException if the configuration is invalid.
126 public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException {
128 MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
129 MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
130 MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
131 return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
132 } catch (Exception e) {
133 throw new DatafileTaskException("Could not parse message router consumer configuration", e);
137 private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
138 return ImmutableMessageRouterSubscriberConfig.builder()
139 .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
143 private SecurityKeys createSecurityKeys() throws DatafileTaskException {
144 return ImmutableSecurityKeys.builder()
145 .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)))
146 .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)))
147 .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)))
148 .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)))
152 private boolean isDmaapCertAuthEnabled(JsonObject config) {
153 return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean();
156 private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException {
157 Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject);
158 if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) {
159 throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources);
161 RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX);
162 MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
164 return ImmutableMessageRouterSubscribeRequest.builder()
165 .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
166 .sourceDefinition(parsedSource)
167 .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
168 .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
173 * Get the sFTP configuration.
175 * @return the sFTP configuration.
176 * @throws DatafileTaskException if a member of the configuration is missing.
178 public @NotNull SftpConfig getSftpConfig() throws DatafileTaskException {
179 String filePath = determineKnownHostsFilePath();
180 return new ImmutableSftpConfig.Builder() //
181 .strictHostKeyChecking(getAsBoolean(jsonObject, CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING))
182 .knownHostsFilePath(filePath).build();
186 * Get the security configuration for communication with the xNF.
188 * @return the xNF communication security configuration.
189 * @throws DatafileTaskException if a member of the configuration is missing.
191 public @NotNull CertificateConfig getCertificateConfig() throws DatafileTaskException {
192 return new ImmutableCertificateConfig.Builder() //
193 .keyCert(getAsString(jsonObject, "dmaap.certificateConfig.keyCert"))
194 .keyPasswordPath(getAsString(jsonObject, "dmaap.certificateConfig.keyPasswordPath"))
195 .trustedCa(getAsString(jsonObject, "dmaap.certificateConfig.trustedCa"))
196 .trustedCaPasswordPath(getAsString(jsonObject, "dmaap.certificateConfig.trustedCaPasswordPath")) //
200 private String determineKnownHostsFilePath() {
201 String filePath = "";
202 if (systemEnvironment != null) {
204 systemEnvironment.getProperty(KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY, "/home/datafile/.ssh/known_hosts");
209 private static @NotNull JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
210 JsonElement elem = obj.get(memberName);
212 throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
217 private static @NotNull String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
218 return get(obj, memberName).getAsString();
221 private static @NotNull Boolean getAsBoolean(JsonObject obj, String memberName) throws DatafileTaskException {
222 return get(obj, memberName).getAsBoolean();
225 private static @NotNull JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
226 return get(obj, memberName).getAsJsonObject();
229 private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
230 return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));