274e4490564e0a31f7a617aa1b0a34e98c2a2cb0
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / common / publishing / DMaaPConfigurationParser.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dcaegen2.collectors.ves
4  * ================================================================================
5  * Copyright (C) 2018 Nokia. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.common.publishing;
21
22 import io.vavr.collection.List;
23 import io.vavr.collection.Map;
24 import io.vavr.control.Option;
25 import io.vavr.control.Try;
26 import org.onap.dcae.common.AnyNode;
27
28 import java.net.URL;
29 import java.nio.file.Files;
30 import java.nio.file.Path;
31
32 import org.json.JSONObject;
33
34 import static io.vavr.API.*;
35 import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
36 import static org.onap.dcae.common.publishing.VavrUtils.f;
37
38 /**
39  * @author Pawel Szalapski (pawel.szalapski@nokia.com)
40  */
41 @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
42 public final class DMaaPConfigurationParser {
43
44     public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
45         return readFromFile(configLocation)
46                 .flatMap(DMaaPConfigurationParser::toJSON)
47                 .flatMap(DMaaPConfigurationParser::toConfigMap);
48     }
49
50     public static Try<Map<String, PublisherConfig>> parseToDomainMapping(JSONObject config) {
51         return toJSON(config.toString())
52             .flatMap(DMaaPConfigurationParser::toConfigMap);
53     }
54
55     private static Try<String> readFromFile(Path configLocation) {
56         return Try(() -> new String(Files.readAllBytes(configLocation)))
57                 .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
58     }
59
60     private static Try<AnyNode> toJSON(String config) {
61         return Try(() -> AnyNode.fromString(config))
62                 .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
63     }
64
65     private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
66         return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
67                 .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
68     }
69
70     private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
71         return dMaaPConfig.has("channels");
72     }
73
74     private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
75         return root.get("channels").toList().toMap(
76                 channel -> channel.get("name").toString(),
77                 channel -> {
78                     String destinationsStr = channel.getAsOption("cambria.url")
79                             .getOrElse(channel.getAsOption("cambria.hosts").get())
80                             .toString();
81                     String topic = channel.get("cambria.topic").toString();
82                     Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
83                     Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
84                     List<String> destinations = List(destinationsStr.split(","));
85                     return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
86                 });
87     }
88
89     private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
90         return root.keys().toMap(
91                 channelName -> channelName,
92                 channelName -> {
93                     AnyNode channelConfig = root.get(channelName);
94                     Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
95                     Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
96                     URL topicURL = unchecked(
97                             () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
98                     String[] pathSegments = topicURL.getPath().substring(1).split("/");
99                     String topic = pathSegments[1];
100                     String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
101                     List<String> destinations = List(destination);
102                     return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
103                 });
104     }
105
106     private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
107                                                     String topic, List<String> destinations) {
108         return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
109                 .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
110                 .getOrElse(new PublisherConfig(destinations, topic));
111     }
112 }