-/*\r
- * ===============================LICENSE_START======================================\r
- * dcae-analytics\r
- * ================================================================================\r
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============================LICENSE_END===========================================\r
- */\r
-\r
-package org.openecomp.dcae.apod.analytics.cdap.tca.utils;\r
-\r
-import co.cask.cdap.api.RuntimeContext;\r
-import com.google.common.base.Function;\r
-import com.google.common.collect.Lists;\r
-import org.apache.commons.lang3.StringUtils;\r
-import org.openecomp.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfig;\r
-import org.openecomp.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfigBuilder;\r
-import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;\r
-import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;\r
-import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;\r
-import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;\r
-import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator;\r
-import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator;\r
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
-import org.openecomp.dcae.apod.analytics.model.config.tca.DMAAPInfo;\r
-import org.openecomp.dcae.apod.analytics.model.config.tca.TCAControllerAppConfig;\r
-import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleIn;\r
-import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleOut;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;\r
-import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import java.io.IOException;\r
-import java.net.MalformedURLException;\r
-import java.net.URL;\r
-import java.util.Collection;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.TreeMap;\r
-\r
-import static com.google.common.collect.Lists.newArrayList;\r
-import static org.openecomp.dcae.apod.analytics.cdap.common.utils.ValidationUtils.validateSettings;\r
-import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH;\r
-\r
-/**\r
- * Utility Helper methods for CDAP TCA sub module.\r
- *\r
- * <p>\r
- * @author Rajiv Singla . Creation Date: 10/24/2016.\r
- */\r
-public abstract class CDAPTCAUtils extends TCAUtils {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(CDAPTCAUtils.class);\r
-\r
- /**\r
- * Function that extracts alert message string from {@link TCAVESAlertEntity}\r
- */\r
- public static final Function<TCAVESAlertEntity, String> MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION =\r
- new Function<TCAVESAlertEntity, String>() {\r
- @Override\r
- public String apply(TCAVESAlertEntity alertEntity) {\r
- return alertEntity == null ? null : alertEntity.getAlertMessage();\r
- }\r
- };\r
-\r
-\r
- /**\r
- * Parses and validates Runtime Arguments to {@link TCAAppPreferences} object\r
- *\r
- * @param runtimeContext Runtime Context\r
- *\r
- * @return validated runtime arguments as {@link TCAAppPreferences} object\r
- */\r
- public static TCAAppPreferences getValidatedTCAAppPreferences(final RuntimeContext runtimeContext) {\r
- // Parse runtime arguments\r
- final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments();\r
- final TCAAppPreferences tcaAppPreferences =\r
- ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class);\r
-\r
- final String appConfigString = runtimeContext.getApplicationSpecification().getConfiguration();\r
-\r
- // populate DMaaP Information from App Config String\r
- populateDMaaPInfoFromAppConfiguration(appConfigString, tcaAppPreferences);\r
-\r
- // Validate runtime arguments\r
- validateSettings(tcaAppPreferences, new TCAPreferencesValidator());\r
-\r
- return tcaAppPreferences;\r
- }\r
-\r
- /**\r
- * Creates an A&AI Http Client config from give {@link TCAAppPreferences}\r
- *\r
- * @param tcaAppPreferences TCA App Preferences\r
- *\r
- * @return A&AI Http Client config\r
- */\r
- public static AAIHttpClientConfig createAAIEnrichmentClientConfig(final TCAAppPreferences tcaAppPreferences) {\r
- final String aaiEnrichmentProxyURLString = tcaAppPreferences.getAaiEnrichmentProxyURL();\r
- URL aaiEnrichmentProxyURL = null;\r
- if (StringUtils.isNotBlank(aaiEnrichmentProxyURLString)) {\r
- aaiEnrichmentProxyURL = parseURL(aaiEnrichmentProxyURLString);\r
- }\r
-\r
- return new AAIHttpClientConfigBuilder(tcaAppPreferences.getAaiEnrichmentHost())\r
- .setAaiProtocol(tcaAppPreferences.getAaiEnrichmentProtocol())\r
- .setAaiHostPortNumber(tcaAppPreferences.getAaiEnrichmentPortNumber())\r
- .setAaiUserName(tcaAppPreferences.getAaiEnrichmentUserName())\r
- .setAaiUserPassword(tcaAppPreferences.getAaiEnrichmentUserPassword())\r
- .setAaiProxyURL(aaiEnrichmentProxyURL)\r
- .setAaiIgnoreSSLCertificateErrors(tcaAppPreferences.getAaiEnrichmentIgnoreSSLCertificateErrors())\r
- .build();\r
- }\r
-\r
- /**\r
- * Populated App Preferences DMaaP Information from Application Config String\r
- *\r
- * @param appConfigString CDAP Application config String\r
- * @param tcaAppPreferences TCA App Preferences\r
- */\r
- private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString,\r
- final TCAAppPreferences tcaAppPreferences) {\r
-\r
- if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) {\r
- LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config");\r
- return;\r
- }\r
-\r
- LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString);\r
-\r
- try {\r
- final TCAControllerAppConfig tcaControllerAppConfig =\r
- readValue(appConfigString, TCAControllerAppConfig.class);\r
-\r
- // Parse Subscriber DMaaP information from App Config String\r
- if (tcaControllerAppConfig.getStreamsSubscribes() != null &&\r
- tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null &&\r
- tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) {\r
-\r
- final DMAAPInfo subscriberDmaapInfo =\r
- tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo();\r
- LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl());\r
- final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl());\r
- tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol());\r
- tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost());\r
- final int subscriberUrlPort = subscriberUrl.getPort() != -1 ?\r
- subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol());\r
- tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort);\r
- tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8));\r
-\r
- final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn();\r
- tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName());\r
- tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword());\r
- } else {\r
- LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString);\r
- }\r
-\r
-\r
- // Parse Publisher DMaaP information from App Config String\r
- if (tcaControllerAppConfig.getStreamsPublishes() != null &&\r
- tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null &&\r
- tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) {\r
-\r
- final DMAAPInfo publisherDmaapInfo =\r
- tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo();\r
- LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl());\r
- final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl());\r
- tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol());\r
- tcaAppPreferences.setPublisherHostName(publisherUrl.getHost());\r
- final int publisherUrlPort = publisherUrl.getPort() != -1 ?\r
- publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol());\r
- tcaAppPreferences.setPublisherHostPort(publisherUrlPort);\r
- tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8));\r
-\r
- final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut();\r
- tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName());\r
- tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword());\r
- } else {\r
- LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString);\r
- }\r
-\r
-\r
- } catch (IOException e) {\r
- throw new CDAPSettingsException(\r
- "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e);\r
- }\r
- }\r
-\r
- /**\r
- * Parses provided DMaaP MR URL string to {@link URL} object\r
- *\r
- * @param urlString url string\r
- *\r
- * @return url object\r
- */\r
- private static URL parseURL(final String urlString) {\r
- try {\r
- return new URL(urlString);\r
- } catch (MalformedURLException e) {\r
- final String errorMessage = String.format("Invalid URL format: %s", urlString);\r
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
- }\r
- }\r
-\r
- /**\r
- * Sets up default DMaaP Port if not provided with DMaaP URL\r
- *\r
- * @param protocol protocol e.g. http or https\r
- *\r
- * @return default DMaaP MR port number\r
- */\r
- private static int getDefaultDMaaPPort(final String protocol) {\r
- if ("http".equals(protocol)) {\r
- return 3904;\r
- } else if ("https".equals(protocol)) {\r
- return 3905;\r
- } else {\r
- return 80;\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Extracts alert message strings from {@link TCAVESAlertEntity}\r
- *\r
- * @param alertEntities collection of alert entities\r
- *\r
- * @return List of alert message strings\r
- */\r
- public static List<String> extractAlertFromAlertEntities(final Collection<TCAVESAlertEntity> alertEntities) {\r
- return Lists.transform(newArrayList(alertEntities), MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION);\r
- }\r
-\r
-\r
- /**\r
- * Converts Runtime Arguments to {@link TCAPolicyPreferences} object\r
- *\r
- * @param runtimeContext CDAP Runtime Arguments\r
- *\r
- * @return TCA Policy Preferences\r
- */\r
- public static TCAPolicy getValidatedTCAPolicyPreferences(final RuntimeContext runtimeContext) {\r
-\r
- final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments();\r
- final TreeMap<String, String> sortedRuntimeArguments = new TreeMap<>(runtimeArguments);\r
-\r
- LOG.debug("Printing all Received Runtime Arguments:");\r
- for (Map.Entry<String, String> runtimeArgsEntry : sortedRuntimeArguments.entrySet()) {\r
- LOG.debug("{}:{}", runtimeArgsEntry.getKey(), runtimeArgsEntry.getValue());\r
- }\r
-\r
- TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences();\r
-\r
- final String tcaPolicyJsonString = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY);\r
-\r
- if (StringUtils.isNotBlank(tcaPolicyJsonString)) {\r
-\r
- LOG.info("TcaPolicy will be set from input argument name: {} as JSON String with value: {}",\r
- AnalyticsConstants.TCA_POLICY_JSON_KEY, tcaPolicyJsonString);\r
-\r
- // initialize unquotedTCAPolicy\r
- String unquotedTCAPolicy = tcaPolicyJsonString.trim();\r
-\r
- //remove starting and ending quote from passed tca policy Json string if present\r
- if (tcaPolicyJsonString.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) &&\r
- tcaPolicyJsonString.trim().endsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) {\r
- unquotedTCAPolicy = tcaPolicyJsonString.trim().substring(1, tcaPolicyJsonString.trim().length() - 1);\r
- }\r
-\r
- try {\r
- tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class);\r
- } catch (IOException e) {\r
- throw new CDAPSettingsException(\r
- "Input tca_policy string format is not correct. tca_policy: " + tcaPolicyJsonString, LOG, e);\r
- }\r
-\r
- } else { // classical controller is being used. Validate preferences as received from classical controller\r
-\r
- LOG.info("TcaPolicy is being parsed as key value pair from classical controller");\r
-\r
- // extract TCA Policy Domain from Runtime Arguments\r
- final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH);\r
-\r
- // create new TCA Policy object\r
- tcaPolicyPreferences.setDomain(policyDomain);\r
-\r
- // filter out other non relevant fields which are not related to tca policy\r
- final Map<String, String> tcaPolicyMap = filterMapByKeyNamePrefix(sortedRuntimeArguments,\r
- TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH);\r
-\r
- // determine functional Roles\r
- final Map<String, Map<String, String>> functionalRolesMap =\r
- extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER);\r
-\r
- // create metrics per functional role list\r
- tcaPolicyPreferences.setMetricsPerEventName(\r
- createTCAPolicyMetricsPerEventNameList(functionalRolesMap));\r
-\r
- }\r
-\r
- // validate tca Policy Preferences\r
- validateSettings(tcaPolicyPreferences, new TCAPolicyPreferencesValidator());\r
-\r
- LOG.info("Printing Effective TCA Policy: {}", tcaPolicyPreferences);\r
-\r
- return tcaPolicyPreferences;\r
- }\r
-}\r
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.tca.utils;
+
+import co.cask.cdap.api.RuntimeContext;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfig;
+import org.onap.dcae.apod.analytics.aai.domain.config.AAIHttpClientConfigBuilder;
+import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
+import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;
+import org.onap.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator;
+import org.onap.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.model.config.tca.DMAAPInfo;
+import org.onap.dcae.apod.analytics.model.config.tca.TCAControllerAppConfig;
+import org.onap.dcae.apod.analytics.model.config.tca.TCAHandleIn;
+import org.onap.dcae.apod.analytics.model.config.tca.TCAHandleOut;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.onap.dcae.apod.analytics.cdap.common.utils.ValidationUtils.validateSettings;
+import static org.onap.dcae.apod.analytics.common.AnalyticsConstants.TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH;
+
+/**
+ * Utility Helper methods for CDAP TCA sub module.
+ *
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/24/2016.
+ */
+public abstract class CDAPTCAUtils extends TCAUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CDAPTCAUtils.class);
+
+ /**
+ * Function that extracts alert message string from {@link TCAVESAlertEntity}
+ */
+ public static final Function<TCAVESAlertEntity, String> MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION =
+ new Function<TCAVESAlertEntity, String>() {
+ @Override
+ public String apply(TCAVESAlertEntity alertEntity) {
+ return alertEntity == null ? null : alertEntity.getAlertMessage();
+ }
+ };
+
+
+ /**
+ * Parses and validates Runtime Arguments to {@link TCAAppPreferences} object
+ *
+ * @param runtimeContext Runtime Context
+ *
+ * @return validated runtime arguments as {@link TCAAppPreferences} object
+ */
+ public static TCAAppPreferences getValidatedTCAAppPreferences(final RuntimeContext runtimeContext) {
+ // Parse runtime arguments
+ final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments();
+ final TCAAppPreferences tcaAppPreferences =
+ ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class);
+
+ final String appConfigString = runtimeContext.getApplicationSpecification().getConfiguration();
+
+ // populate DMaaP Information from App Config String
+ populateDMaaPInfoFromAppConfiguration(appConfigString, tcaAppPreferences);
+
+ // Validate runtime arguments
+ validateSettings(tcaAppPreferences, new TCAPreferencesValidator());
+
+ return tcaAppPreferences;
+ }
+
+ /**
+ * Creates an A&AI Http Client config from give {@link TCAAppPreferences}
+ *
+ * @param tcaAppPreferences TCA App Preferences
+ *
+ * @return A&AI Http Client config
+ */
+ public static AAIHttpClientConfig createAAIEnrichmentClientConfig(final TCAAppPreferences tcaAppPreferences) {
+ final String aaiEnrichmentProxyURLString = tcaAppPreferences.getAaiEnrichmentProxyURL();
+ URL aaiEnrichmentProxyURL = null;
+ if (StringUtils.isNotBlank(aaiEnrichmentProxyURLString)) {
+ aaiEnrichmentProxyURL = parseURL(aaiEnrichmentProxyURLString);
+ }
+
+ return new AAIHttpClientConfigBuilder(tcaAppPreferences.getAaiEnrichmentHost())
+ .setAaiProtocol(tcaAppPreferences.getAaiEnrichmentProtocol())
+ .setAaiHostPortNumber(tcaAppPreferences.getAaiEnrichmentPortNumber())
+ .setAaiUserName(tcaAppPreferences.getAaiEnrichmentUserName())
+ .setAaiUserPassword(tcaAppPreferences.getAaiEnrichmentUserPassword())
+ .setAaiProxyURL(aaiEnrichmentProxyURL)
+ .setAaiIgnoreSSLCertificateErrors(tcaAppPreferences.getAaiEnrichmentIgnoreSSLCertificateErrors())
+ .build();
+ }
+
+ /**
+ * Populated App Preferences DMaaP Information from Application Config String
+ *
+ * @param appConfigString CDAP Application config String
+ * @param tcaAppPreferences TCA App Preferences
+ */
+ private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString,
+ final TCAAppPreferences tcaAppPreferences) {
+
+ if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) {
+ LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config");
+ return;
+ }
+
+ LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString);
+
+ try {
+ final TCAControllerAppConfig tcaControllerAppConfig =
+ readValue(appConfigString, TCAControllerAppConfig.class);
+
+ // Parse Subscriber DMaaP information from App Config String
+ if (tcaControllerAppConfig.getStreamsSubscribes() != null &&
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null &&
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) {
+
+ final DMAAPInfo subscriberDmaapInfo =
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo();
+ LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl());
+ final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl());
+ tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol());
+ tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost());
+ final int subscriberUrlPort = subscriberUrl.getPort() != -1 ?
+ subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol());
+ tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort);
+ tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8));
+
+ final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn();
+ tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName());
+ tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword());
+ } else {
+ LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString);
+ }
+
+
+ // Parse Publisher DMaaP information from App Config String
+ if (tcaControllerAppConfig.getStreamsPublishes() != null &&
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null &&
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) {
+
+ final DMAAPInfo publisherDmaapInfo =
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo();
+ LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl());
+ final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl());
+ tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol());
+ tcaAppPreferences.setPublisherHostName(publisherUrl.getHost());
+ final int publisherUrlPort = publisherUrl.getPort() != -1 ?
+ publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol());
+ tcaAppPreferences.setPublisherHostPort(publisherUrlPort);
+ tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8));
+
+ final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut();
+ tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName());
+ tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword());
+ } else {
+ LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString);
+ }
+
+
+ } catch (IOException e) {
+ throw new CDAPSettingsException(
+ "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e);
+ }
+ }
+
+ /**
+ * Parses provided DMaaP MR URL string to {@link URL} object
+ *
+ * @param urlString url string
+ *
+ * @return url object
+ */
+ private static URL parseURL(final String urlString) {
+ try {
+ return new URL(urlString);
+ } catch (MalformedURLException e) {
+ final String errorMessage = String.format("Invalid URL format: %s", urlString);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+ /**
+ * Sets up default DMaaP Port if not provided with DMaaP URL
+ *
+ * @param protocol protocol e.g. http or https
+ *
+ * @return default DMaaP MR port number
+ */
+ private static int getDefaultDMaaPPort(final String protocol) {
+ if ("http".equals(protocol)) {
+ return 3904;
+ } else if ("https".equals(protocol)) {
+ return 3905;
+ } else {
+ return 80;
+ }
+ }
+
+
+ /**
+ * Extracts alert message strings from {@link TCAVESAlertEntity}
+ *
+ * @param alertEntities collection of alert entities
+ *
+ * @return List of alert message strings
+ */
+ public static List<String> extractAlertFromAlertEntities(final Collection<TCAVESAlertEntity> alertEntities) {
+ return Lists.transform(newArrayList(alertEntities), MAP_ALERT_ENTITY_TO_ALERT_STRING_FUNCTION);
+ }
+
+
+ /**
+ * Converts Runtime Arguments to {@link TCAPolicyPreferences} object
+ *
+ * @param runtimeContext CDAP Runtime Arguments
+ *
+ * @return TCA Policy Preferences
+ */
+ public static TCAPolicy getValidatedTCAPolicyPreferences(final RuntimeContext runtimeContext) {
+
+ final Map<String, String> runtimeArguments = runtimeContext.getRuntimeArguments();
+ final TreeMap<String, String> sortedRuntimeArguments = new TreeMap<>(runtimeArguments);
+
+ LOG.debug("Printing all Received Runtime Arguments:");
+ for (Map.Entry<String, String> runtimeArgsEntry : sortedRuntimeArguments.entrySet()) {
+ LOG.debug("{}:{}", runtimeArgsEntry.getKey(), runtimeArgsEntry.getValue());
+ }
+
+ TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences();
+
+ final String tcaPolicyJsonString = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY);
+
+ if (StringUtils.isNotBlank(tcaPolicyJsonString)) {
+
+ LOG.info("TcaPolicy will be set from input argument name: {} as JSON String with value: {}",
+ AnalyticsConstants.TCA_POLICY_JSON_KEY, tcaPolicyJsonString);
+
+ // initialize unquotedTCAPolicy
+ String unquotedTCAPolicy = tcaPolicyJsonString.trim();
+
+ //remove starting and ending quote from passed tca policy Json string if present
+ if (tcaPolicyJsonString.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) &&
+ tcaPolicyJsonString.trim().endsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) {
+ unquotedTCAPolicy = tcaPolicyJsonString.trim().substring(1, tcaPolicyJsonString.trim().length() - 1);
+ }
+
+ try {
+ tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class);
+ } catch (IOException e) {
+ throw new CDAPSettingsException(
+ "Input tca_policy string format is not correct. tca_policy: " + tcaPolicyJsonString, LOG, e);
+ }
+
+ } else { // classical controller is being used. Validate preferences as received from classical controller
+
+ LOG.info("TcaPolicy is being parsed as key value pair from classical controller");
+
+ // extract TCA Policy Domain from Runtime Arguments
+ final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH);
+
+ // create new TCA Policy object
+ tcaPolicyPreferences.setDomain(policyDomain);
+
+ // filter out other non relevant fields which are not related to tca policy
+ final Map<String, String> tcaPolicyMap = filterMapByKeyNamePrefix(sortedRuntimeArguments,
+ TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH);
+
+ // determine functional Roles
+ final Map<String, Map<String, String>> functionalRolesMap =
+ extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER);
+
+ // create metrics per functional role list
+ tcaPolicyPreferences.setMetricsPerEventName(
+ createTCAPolicyMetricsPerEventNameList(functionalRolesMap));
+
+ }
+
+ // validate tca Policy Preferences
+ validateSettings(tcaPolicyPreferences, new TCAPolicyPreferencesValidator());
+
+ LOG.info("Printing Effective TCA Policy: {}", tcaPolicyPreferences);
+
+ return tcaPolicyPreferences;
+ }
+}