6ace4aaed99f50c07cf4a4beaa488d165e196905
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.dcaegen2.collectors.datafile.configuration;
20
21 import com.google.gson.JsonElement;
22 import com.google.gson.JsonObject;
23
24 import java.nio.file.Path;
25 import java.nio.file.Paths;
26 import java.time.Duration;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Properties;
31
32 import javax.validation.constraints.NotNull;
33
34 import io.vavr.collection.Stream;
35 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
36 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
37 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
46 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
47 import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
48 import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
49 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
50
51 /**
52  * Parses the cloud configuration.
53  *
54  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18
55  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
56  */
57 public class CloudConfigParser {
58
59     private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath";
60     private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath";
61     private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath";
62     private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath";
63     private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth";
64     private static final String CONFIG = "config";
65
66     private static final String KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY = "KNOWN_HOSTS_FILE_PATH";
67     private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
68         "sftp.security.strictHostKeyChecking";
69
70     private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
71     private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
72     private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
73     private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
74     private static final int FIRST_SOURCE_INDEX = 0;
75
76     private final Properties systemEnvironment;
77
78     private final JsonObject jsonObject;
79
80     public CloudConfigParser(JsonObject jsonObject, Properties systemEnvironment) {
81         this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
82         this.systemEnvironment = systemEnvironment;
83     }
84
85     /**
86      * Get the publisher configurations.
87      *
88      * @return a map with change identifier as key and the connected publisher configuration as value.
89      * @throws DatafileTaskException if a member of the configuration is missing.
90      */
91     public @NotNull Map<String, PublisherConfiguration> getDmaapPublisherConfigurations() throws DatafileTaskException {
92         JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject();
93         Iterator<String> changeIdentifierList = producerCfgs.keySet().iterator();
94         Map<String, PublisherConfiguration> result = new HashMap<>();
95
96         while (changeIdentifierList.hasNext()) {
97             String changeIdentifier = changeIdentifierList.next();
98             JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier);
99             JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject();
100
101             PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() //
102                 .publishUrl(getAsString(feedConfig, "publish_url")) //
103                 .password(getAsString(feedConfig, "password")) //
104                 .userName(getAsString(feedConfig, "username")) //
105                 .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) //
106                 .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) //
107                 .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) //
108                 .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) //
109                 .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
110                 .changeIdentifier(changeIdentifier) //
111                 .logUrl(getAsString(feedConfig, "log_url")) //
112                 .build();
113
114             result.put(cfg.changeIdentifier(), cfg);
115         }
116         return result;
117     }
118
119     /**
120      * Get the consumer configuration.
121      *
122      * @return the consumer configuration.
123      * @throws DatafileTaskException if the configuration is invalid.
124      */
125     public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException {
126         try {
127             MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
128             MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
129             MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
130             return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
131         } catch (Exception e) {
132             throw new DatafileTaskException("Could not parse message router consumer configuration", e);
133         }
134     }
135
136     private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
137         return ImmutableMessageRouterSubscriberConfig.builder()
138             .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
139             .build();
140     }
141
142     private SecurityKeys createSecurityKeys() throws DatafileTaskException {
143         return ImmutableSecurityKeys.builder()
144             .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)))
145             .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)))
146             .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)))
147             .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)))
148             .build();
149     }
150
151     private boolean isDmaapCertAuthEnabled(JsonObject config) {
152         return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean();
153     }
154
155     private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException {
156         Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject);
157         if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) {
158             throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources);
159         }
160         RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX);
161         MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
162
163         return ImmutableMessageRouterSubscribeRequest.builder()
164             .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
165             .sourceDefinition(parsedSource)
166             .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
167             .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
168             .build();
169     }
170
171     /**
172      * Get the sFTP configuration.
173      *
174      * @return the sFTP configuration.
175      * @throws DatafileTaskException if a member of the configuration is missing.
176      */
177     public @NotNull SftpConfig getSftpConfig() throws DatafileTaskException {
178         String filePath = determineKnownHostsFilePath();
179         return new ImmutableSftpConfig.Builder() //
180             .strictHostKeyChecking(getAsBoolean(jsonObject, CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING))
181             .knownHostsFilePath(filePath).build();
182     }
183
184     /**
185      * Get the security configuration for communication with the xNF.
186      *
187      * @return the xNF communication security configuration.
188      * @throws DatafileTaskException if a member of the configuration is missing.
189      */
190     public @NotNull FtpesConfig getFtpesConfig() throws DatafileTaskException {
191         return new ImmutableFtpesConfig.Builder() //
192             .keyCert(getAsString(jsonObject, "dmaap.ftpesConfig.keyCert"))
193             .keyPasswordPath(getAsString(jsonObject, "dmaap.ftpesConfig.keyPasswordPath"))
194             .trustedCa(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCa"))
195             .trustedCaPasswordPath(getAsString(jsonObject, "dmaap.ftpesConfig.trustedCaPasswordPath")) //
196             .build();
197     }
198
199     private String determineKnownHostsFilePath() {
200         String filePath = "";
201         if (systemEnvironment != null) {
202             filePath =
203                 systemEnvironment.getProperty(KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY, "/home/datafile/.ssh/known_hosts");
204         }
205         return filePath;
206     }
207
208     private static @NotNull JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException {
209         JsonElement elem = obj.get(memberName);
210         if (elem == null) {
211             throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj);
212         }
213         return elem;
214     }
215
216     private static @NotNull String getAsString(JsonObject obj, String memberName) throws DatafileTaskException {
217         return get(obj, memberName).getAsString();
218     }
219
220     private static @NotNull Boolean getAsBoolean(JsonObject obj, String memberName) throws DatafileTaskException {
221         return get(obj, memberName).getAsBoolean();
222     }
223
224     private static @NotNull JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException {
225         return get(obj, memberName).getAsJsonObject();
226     }
227
228     private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
229         return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));
230     }
231
232 }