Add support for ABATED alerts within CDAP TCA
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / utils / CDAPTCAUtils.java
index 29d42d5..89c5a84 100644 (file)
@@ -23,6 +23,7 @@ package org.openecomp.dcae.apod.analytics.cdap.tca.utils;
 import co.cask.cdap.api.RuntimeContext;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
 import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
@@ -30,12 +31,19 @@ import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;
 import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPolicyPreferencesValidator;
 import org.openecomp.dcae.apod.analytics.cdap.tca.validator.TCAPreferencesValidator;
 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.model.config.tca.DMAAPInfo;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAControllerAppConfig;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleIn;
+import org.openecomp.dcae.apod.analytics.model.config.tca.TCAHandleOut;
 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -80,12 +88,123 @@ public abstract class CDAPTCAUtils extends TCAUtils {
         final TCAAppPreferences tcaAppPreferences =
                 ANALYTICS_MODEL_OBJECT_MAPPER.convertValue(runtimeArguments, TCAAppPreferences.class);
 
+        final String appConfigString = runtimeContext.getApplicationSpecification().getConfiguration();
+
+        // populate DMaaP Information from App Config String
+        populateDMaaPInfoFromAppConfiguration(appConfigString, tcaAppPreferences);
+
         // Validate runtime arguments
         validateSettings(tcaAppPreferences, new TCAPreferencesValidator());
 
         return tcaAppPreferences;
     }
 
+    /**
+     * Populated App Preferences DMaaP Information from Application Config String
+     *
+     * @param appConfigString  CDAP Application config String
+     * @param tcaAppPreferences TCA App Preferences
+     */
+    private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString,
+                                                              final TCAAppPreferences tcaAppPreferences) {
+
+        if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) {
+            LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config");
+        }
+
+        LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString);
+
+        try {
+            final TCAControllerAppConfig tcaControllerAppConfig =
+                    readValue(appConfigString, TCAControllerAppConfig.class);
+
+            // Parse Subscriber DMaaP information from App Config String
+            if (tcaControllerAppConfig.getStreamsSubscribes() != null &&
+                    tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null &&
+                    tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) {
+
+                final DMAAPInfo subscriberDmaapInfo =
+                        tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo();
+                LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl());
+                final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl());
+                tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol());
+                tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost());
+                final int subscriberUrlPort = subscriberUrl.getPort() != -1 ?
+                        subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol());
+                tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort);
+                tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8));
+
+                final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn();
+                tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName());
+                tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword());
+            } else {
+                LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString);
+            }
+
+
+            // Parse Publisher DMaaP information from App Config String
+            if (tcaControllerAppConfig.getStreamsPublishes() != null &&
+                    tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null &&
+                    tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) {
+
+                final DMAAPInfo publisherDmaapInfo =
+                        tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo();
+                LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl());
+                final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl());
+                tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol());
+                tcaAppPreferences.setPublisherHostName(publisherUrl.getHost());
+                final int publisherUrlPort = publisherUrl.getPort() != -1 ?
+                        publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol());
+                tcaAppPreferences.setPublisherHostPort(publisherUrlPort);
+                tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8));
+
+                final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut();
+                tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName());
+                tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword());
+            } else {
+                LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString);
+            }
+
+
+        } catch (IOException e) {
+            throw new CDAPSettingsException(
+                    "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e);
+        }
+    }
+
+    /**
+     * Parses provided DMaaP MR URL string to {@link URL} object
+     *
+     * @param urlString url string
+     *
+     * @return url object
+     */
+    private static URL parseURL(final String urlString) {
+        try {
+            return new URL(urlString);
+        } catch (MalformedURLException e) {
+            final String errorMessage = String.format("Invalid URL format: %s", urlString);
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+        }
+    }
+
+    /**
+     * Sets up default DMaaP Port if not provided with DMaaP URL
+     *
+     * @param protocol protocol e.g. http or https
+     *
+     * @return default DMaaP MR port number
+     */
+    private static int getDefaultDMaaPPort(final String protocol) {
+        if ("http".equals(protocol)) {
+            return 3904;
+        } else if ("https".equals(protocol)) {
+            return 3905;
+        } else {
+            return 80;
+        }
+    }
+
 
     /**
      * Extracts alert message strings from {@link TCAVESAlertEntity}
@@ -118,28 +237,32 @@ public abstract class CDAPTCAUtils extends TCAUtils {
 
         TCAPolicyPreferences tcaPolicyPreferences = new TCAPolicyPreferences();
 
-        final String tcaPolicy = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY);
+        final String tcaPolicyJsonString = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_JSON_KEY);
 
-        if (tcaPolicy != null) {
+        if (StringUtils.isNotBlank(tcaPolicyJsonString)) {
 
-            LOG.debug(" tcaPolicy is being read from JSON String");
+            LOG.info("TcaPolicy will be set from input argument name: {} as JSON String with value: {}",
+                    AnalyticsConstants.TCA_POLICY_JSON_KEY, tcaPolicyJsonString);
 
             // initialize unquotedTCAPolicy
-            String unquotedTCAPolicy = tcaPolicy;
+            String unquotedTCAPolicy = tcaPolicyJsonString.trim();
 
-            //remove starting and ending quote from tcaPolicy
-            if (tcaPolicy.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) && tcaPolicy.trim().endsWith
-                    (AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) {
-                unquotedTCAPolicy = tcaPolicy.trim().substring(1, tcaPolicy.trim().length() - 1);
+            //remove starting and ending quote from passed tca policy Json string if present
+            if (tcaPolicyJsonString.trim().startsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER) &&
+                    tcaPolicyJsonString.trim().endsWith(AnalyticsConstants.TCA_POLICY_STRING_DELIMITER)) {
+                unquotedTCAPolicy = tcaPolicyJsonString.trim().substring(1, tcaPolicyJsonString.trim().length() - 1);
             }
 
             try {
                 tcaPolicyPreferences = readValue(unquotedTCAPolicy , TCAPolicyPreferences.class);
             } catch (IOException e) {
-                throw new CDAPSettingsException("Invalid tca policy format", LOG, e);
+                throw new CDAPSettingsException(
+                        "Input tca_policy string format is not correct. tca_policy: " + tcaPolicyJsonString, LOG, e);
             }
 
-        } else {  // old controller is being used.  Validate preferences as received from old controller
+        } else {  // classical controller is being used.  Validate preferences as received from classical controller
+
+            LOG.info("TcaPolicy is being parsed as key value pair from classical controller");
 
             // extract TCA Policy Domain from Runtime Arguments
             final String policyDomain = sortedRuntimeArguments.get(AnalyticsConstants.TCA_POLICY_DOMAIN_PATH);
@@ -156,8 +279,8 @@ public abstract class CDAPTCAUtils extends TCAUtils {
                     extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER);
 
             // create metrics per functional role list
-            tcaPolicyPreferences.setMetricsPerFunctionalRole(
-                    createTCAPolicyMetricsPerFunctionalRoleList(functionalRolesMap));
+            tcaPolicyPreferences.setMetricsPerEventName(
+                    createTCAPolicyMetricsPerEventNameList(functionalRolesMap));
 
         }