2 * ============LICENSE_START=======================================================
3 * org.onap.dcaegen2.collectors.ves
4 * ================================================================================
5 * Copyright (C) 2018 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.common.publishing;
22 import io.vavr.collection.List;
23 import io.vavr.collection.Map;
24 import io.vavr.control.Option;
25 import io.vavr.control.Try;
26 import org.onap.dcae.common.AnyNode;
29 import java.nio.file.Files;
30 import java.nio.file.Path;
32 import org.json.JSONObject;
34 import static io.vavr.API.*;
35 import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
36 import static org.onap.dcae.common.publishing.VavrUtils.f;
39 * @author Pawel Szalapski (pawel.szalapski@nokia.com)
41 @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
42 public final class DMaaPConfigurationParser {
44 public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
45 return readFromFile(configLocation)
46 .flatMap(DMaaPConfigurationParser::toJSON)
47 .flatMap(DMaaPConfigurationParser::toConfigMap);
50 public static Try<Map<String, PublisherConfig>> parseToDomainMapping(JSONObject config) {
51 return toJSON(config.toString())
52 .flatMap(DMaaPConfigurationParser::toConfigMap);
55 private static Try<String> readFromFile(Path configLocation) {
56 return Try(() -> new String(Files.readAllBytes(configLocation)))
57 .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
60 private static Try<AnyNode> toJSON(String config) {
61 return Try(() -> AnyNode.fromString(config))
62 .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
65 private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
66 return Try(() -> parseNewFormat(config))
67 .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
71 private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
72 return root.keys().toMap(
73 channelName -> channelName,
75 AnyNode channelConfig = root.get(channelName);
76 Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
77 Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
78 URL topicURL = unchecked(
79 () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
80 String[] pathSegments = topicURL.getPath().substring(1).split("/");
81 String topic = pathSegments[1];
82 String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
83 List<String> destinations = List(destination);
84 return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
88 private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
89 String topic, List<String> destinations) {
90 return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
91 .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
92 .getOrElse(new PublisherConfig(destinations, topic));