2 * ============LICENSE_START=======================================================
3 * org.onap.dcaegen2.collectors.ves
4 * ================================================================================
5 * Copyright (C) 2018 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.common.publishing;
22 import io.vavr.collection.List;
23 import io.vavr.collection.Map;
24 import io.vavr.control.Option;
25 import io.vavr.control.Try;
26 import org.onap.dcae.common.AnyNode;
29 import java.nio.file.Files;
30 import java.nio.file.Path;
32 import org.json.JSONObject;
34 import static io.vavr.API.*;
35 import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
36 import static org.onap.dcae.common.publishing.VavrUtils.f;
39 * @author Pawel Szalapski (pawel.szalapski@nokia.com)
41 @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
42 public final class DMaaPConfigurationParser {
44 public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
45 return readFromFile(configLocation)
46 .flatMap(DMaaPConfigurationParser::toJSON)
47 .flatMap(DMaaPConfigurationParser::toConfigMap);
50 public static Try<Map<String, PublisherConfig>> parseToDomainMapping(JSONObject config) {
51 return toJSON(config.toString())
52 .flatMap(DMaaPConfigurationParser::toConfigMap);
55 private static Try<String> readFromFile(Path configLocation) {
56 return Try(() -> new String(Files.readAllBytes(configLocation)))
57 .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
60 private static Try<AnyNode> toJSON(String config) {
61 return Try(() -> AnyNode.fromString(config))
62 .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
65 private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
66 return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
67 .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
70 private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
71 return dMaaPConfig.has("channels");
74 private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
75 return root.get("channels").toList().toMap(
76 channel -> channel.get("name").toString(),
78 String destinationsStr = channel.getAsOption("cambria.url")
79 .getOrElse(channel.getAsOption("cambria.hosts").get())
81 String topic = channel.get("cambria.topic").toString();
82 Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
83 Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
84 List<String> destinations = List(destinationsStr.split(","));
85 return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
89 private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
90 return root.keys().toMap(
91 channelName -> channelName,
93 AnyNode channelConfig = root.get(channelName);
94 Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
95 Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
96 URL topicURL = unchecked(
97 () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
98 String[] pathSegments = topicURL.getPath().substring(1).split("/");
99 String topic = pathSegments[1];
100 String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
101 List<String> destinations = List(destination);
102 return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
106 private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
107 String topic, List<String> destinations) {
108 return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
109 .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
110 .getOrElse(new PublisherConfig(destinations, topic));