025166c25d0552f6ffd052dcf6906c089c8bdcaf
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2018, 2020-2021 NOKIA Intellectual Property, 2018-2019 Nordix Foundation.
4  * All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * ============LICENSE_END=========================================================
18  */
19
20 package org.onap.dcaegen2.collectors.datafile.configuration;
21
22 import com.google.gson.JsonElement;
23 import com.google.gson.JsonObject;
24
25 import java.nio.file.Path;
26 import java.nio.file.Paths;
27 import java.time.Duration;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.Properties;
32
33 import javax.validation.constraints.NotNull;
34
35 import io.vavr.collection.Stream;
36 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
37 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
38 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
46 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
47 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
48 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
49 import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
50 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
51
52 /**
53  * Parses the cloud configuration.
54  *
55  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
56  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
57  */
58 public class CloudConfigParser {
59
60     private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
61     private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
62     private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
63     private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
64     private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
65     private static final String CONFIG = "config";
66
67     private static final String KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY = "KNOWN_HOSTS_FILE_PATH";
68     private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
69         "sftp.security.strictHostKeyChecking";
70
71     private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
72     private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
73     private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
74     private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
75     private static final int FIRST_SOURCE_INDEX = 0;
76
77     private final Properties systemEnvironment;
78
79     private final JsonObject jsonObject;
80
81     public CloudConfigParser(JsonObject jsonObject, Properties systemEnvironment) {
82         this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
83         this.systemEnvironment = systemEnvironment;
84     }
85
86     /**
87      * Get the publisher configurations.
88      *
89      * @return a map with change identifier as key and the connected publisher configuration as value.
90      * @throws DatafileTaskException if a member of the configuration is missing.
91      */
92     public @NotNull Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
93         JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject();
94         Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator();
95         Map<String, PublisherConfiguration> result = new HashMap<>();
96
97         while (changeIdentifierList.hasNext()) {
98             String changeIdentifier = changeIdentifierList.next();
99             JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier);
100             JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject();
101
102             PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
103                 .publishUrl(getAsString(feedConfig, "publish_url")) //
104                 .password(getAsString(feedConfig, "password")) //
105                 .userName(getAsString(feedConfig, "username")) //
106                 .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
107                 .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
108                 .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) //
109                 .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
110                 .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
111                 .changeIdentifier(changeIdentifier) //
112                 .logUrl(getAsString(feedConfig, "log_url")) //
113                 .build();
114
115             result.put(cfg.changeIdentifier(), cfg);
116         }
117         return result;
118     }
119
120     /**
121      * Get the consumer configuration.
122      *
123      * @return the consumer configuration.
124      * @throws DatafileTaskException if the configuration is invalid.
125      */
126     public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException {
127         try {
128             MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
129             MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
130             MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
131             return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
132         } catch (Exception e) {
133             throw new DatafileTaskException("Could not parse message router consumer configuration", e);
134         }
135     }
136
137     private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
138         return ImmutableMessageRouterSubscriberConfig.builder()
139             .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
140             .build();
141     }
142
143     private SecurityKeys createSecurityKeys() throws DatafileTaskException {
144         return ImmutableSecurityKeys.builder()
145             .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)))
146             .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)))
147             .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)))
148             .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)))
149             .build();
150     }
151
152     private boolean isDmaapCertAuthEnabled(JsonObject config) {
153         return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean();
154     }
155
156     private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException {
157         Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject);
158         if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) {
159             throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources);
160         }
161         RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX);
162         MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
163
164         return ImmutableMessageRouterSubscribeRequest.builder()
165             .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
166             .sourceDefinition(parsedSource)
167             .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
168             .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
169             .build();
170     }
171
172     /**
173      * Get the sFTP configuration.
174      *
175      * @return the sFTP configuration.
176      * @throws DatafileTaskException if a member of the configuration is missing.
177      */
178     public @NotNull SftpConfig getSftpConfig() throws DatafileTaskException {
179         String filePath = determineKnownHostsFilePath();
180         return new ImmutableSftpConfig.Builder() //
181             .strictHostKeyChecking(getAsBoolean(jsonObject, CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING))
182             .knownHostsFilePath(filePath).build();
183     }
184
185     /**
186      * Get the security configuration for communication with the xNF.
187      *
188      * @return the xNF communication security configuration.
189      * @throws DatafileTaskException if a member of the configuration is missing.
190      */
191     public @NotNull CertificateConfig getCertificateConfig() throws DatafileTaskException {
192         return new ImmutableCertificateConfig.Builder() //
193             .keyCert(getAsString(jsonObject, "dmaap.certificateConfig.keyCert"))
194             .keyPasswordPath(getAsString(jsonObject, "dmaap.certificateConfig.keyPasswordPath"))
195             .trustedCa(getAsString(jsonObject, "dmaap.certificateConfig.trustedCa"))
196             .trustedCaPasswordPath(getAsString(jsonObject, "dmaap.certificateConfig.trustedCaPasswordPath")) //
197             .httpsHostnameVerify(getAsBooleanOrDefault(jsonObject, "dmaap.certificateConfig.httpsHostnameVerify", Boolean.TRUE))
198             .build();
199     }
200
201     private String determineKnownHostsFilePath() {
202         String filePath = "";
203         if (systemEnvironment != null) {
204             filePath =
205                 systemEnvironment.getProperty(KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY, "/home/datafile/.ssh/known_hosts");
206         }
207         return filePath;
208     }
209
210     private static @NotNull JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
211         JsonElement elem = obj.get(memberName);
212         if (elem == null) {
213             throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
214         }
215         return elem;
216     }
217
218     private static @NotNull String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
219         return get(obj, memberName).getAsString();
220     }
221
222     private static @NotNull Boolean getAsBoolean(JsonObject obj, String memberName) throws DatafileTaskException {
223         return get(obj, memberName).getAsBoolean();
224     }
225
226     private static @NotNull Boolean getAsBooleanOrDefault(JsonObject obj, String memberName, Boolean def) {
227         try {
228             return get(obj, memberName).getAsBoolean();
229         } catch (DatafileTaskException e) {
230             return def;
231         }
232     }
233
234     private static @NotNull JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
235         return get(obj, memberName).getAsJsonObject();
236     }
237
238     private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
239         return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));
240     }
241
242 }