TCA: Replace any openecomp reference by onap
[dcaegen2/analytics/tca.git] / dcae-analytics-tca / src / main / java / org / onap / dcae / apod / analytics / tca / utils / TCAUtils.java
-/*\r
- * ===============================LICENSE_START======================================\r
- *  dcae-analytics\r
- * ================================================================================\r
- *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- *  Licensed under the Apache License, Version 2.0 (the "License");\r
- *  you may not use this file except in compliance with the License.\r
- *   You may obtain a copy of the License at\r
- *\r
- *          http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- *  Unless required by applicable law or agreed to in writing, software\r
- *  distributed under the License is distributed on an "AS IS" BASIS,\r
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- *  See the License for the specific language governing permissions and\r
- *  limitations under the License.\r
- *  ============================LICENSE_END===========================================\r
- */\r
-\r
-package org.openecomp.dcae.apod.analytics.tca.utils;\r
-\r
-import com.fasterxml.jackson.core.JsonProcessingException;\r
-import com.fasterxml.jackson.databind.JsonNode;\r
-import com.google.common.base.Function;\r
-import com.google.common.base.Optional;\r
-import com.google.common.base.Predicate;\r
-import com.google.common.base.Predicates;\r
-import com.google.common.base.Supplier;\r
-import com.google.common.base.Suppliers;\r
-import com.google.common.collect.HashBasedTable;\r
-import com.google.common.collect.ImmutableList;\r
-import com.google.common.collect.ImmutableMap;\r
-import com.google.common.collect.Iterables;\r
-import com.google.common.collect.Lists;\r
-import com.google.common.collect.Maps;\r
-import com.google.common.collect.Table;\r
-import com.jayway.jsonpath.DocumentContext;\r
-import com.jayway.jsonpath.JsonPath;\r
-import com.jayway.jsonpath.TypeRef;\r
-import org.apache.commons.lang3.StringUtils;\r
-import org.apache.commons.lang3.tuple.ImmutablePair;\r
-import org.apache.commons.lang3.tuple.Pair;\r
-import org.openecomp.dcae.apod.analytics.aai.service.AAIEnrichmentClient;\r
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
-import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;\r
-import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;\r
-import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;\r
-import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;\r
-import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;\r
-import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;\r
-import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;\r
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;\r
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;\r
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;\r
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;\r
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;\r
-import org.quartz.Job;\r
-import org.quartz.JobBuilder;\r
-import org.quartz.JobDataMap;\r
-import org.quartz.JobDetail;\r
-import org.quartz.Scheduler;\r
-import org.quartz.SchedulerException;\r
-import org.quartz.SimpleScheduleBuilder;\r
-import org.quartz.SimpleTrigger;\r
-import org.quartz.TriggerBuilder;\r
-import org.quartz.impl.StdSchedulerFactory;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import java.io.IOException;\r
-import java.math.BigDecimal;\r
-import java.util.ArrayList;\r
-import java.util.Collections;\r
-import java.util.Comparator;\r
-import java.util.Date;\r
-import java.util.HashMap;\r
-import java.util.Iterator;\r
-import java.util.LinkedHashMap;\r
-import java.util.LinkedList;\r
-import java.util.List;\r
-import java.util.Map;\r
-import java.util.Properties;\r
-import java.util.Set;\r
-import java.util.SortedMap;\r
-import java.util.TreeMap;\r
-import java.util.UUID;\r
-\r
-import javax.annotation.Nonnull;\r
-import javax.annotation.Nullable;\r
-\r
-import static com.google.common.collect.Lists.newArrayList;\r
-import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;\r
-\r
-/**\r
- * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get\r
- * pre configured Json Object Mapper understand serialization and deserialization of CEF Message\r
- * and TCA Policy\r
- *\r
- * @author Rajiv Singla . Creation Date: 10/24/2016.\r
- */\r
-public abstract class TCAUtils extends AnalyticsModelJsonUtils {\r
-\r
-    private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);\r
-\r
-    /**\r
-     * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,\r
-     * WARNING )\r
-     */\r
-    private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {\r
-        @Override\r
-        public int compare(Threshold threshold1, Threshold threshold2) {\r
-            return threshold1.getSeverity().compareTo(threshold2.getSeverity());\r
-        }\r
-    };\r
-\r
-    /**\r
-     * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}\r
-     *\r
-     * @return TCA Policy Metrics Per Event Name list\r
-     */\r
-    public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {\r
-        return new Function<TCAPolicy, List<MetricsPerEventName>>() {\r
-            @Nullable\r
-            @Override\r
-            public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {\r
-                return tcaPolicy.getMetricsPerEventName();\r
-            }\r
-        };\r
-    }\r
-\r
-    /**\r
-     * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from\r
-     * {@link MetricsPerEventName}\r
-     *\r
-     * @return Event Names or a Metrics Per Event Name object\r
-     */\r
-    public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {\r
-        return new Function<MetricsPerEventName, String>() {\r
-            @Override\r
-            public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {\r
-                return metricsPerEventName.getEventName();\r
-            }\r
-        };\r
-    }\r
-\r
-\r
-    /**\r
-     * Extracts {@link TCAPolicy} Event Names\r
-     *\r
-     * @param tcaPolicy TCA Policy\r
-     * @return List of event names in the TCA Policy\r
-     */\r
-    public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {\r
-        final List<MetricsPerEventName> metricsPerEventNames =\r
-                tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);\r
-\r
-        return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());\r
-    }\r
-\r
-    /**\r
-     * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to\r
-     * change during runtime\r
-     *\r
-     * @param tcaPolicy TCA Policy\r
-     * @return a Supplier that memoize the TCA Policy event names\r
-     */\r
-    public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {\r
-        return Suppliers.memoize(new Supplier<List<String>>() {\r
-            @Override\r
-            public List<String> get() {\r
-                return getPolicyEventNames(tcaPolicy);\r
-            }\r
-        });\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path\r
-     *\r
-     * @param tcaPolicy TCA Policy\r
-     * @return A table with Keys of event name and field path containing List of threshold as values\r
-     */\r
-    public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {\r
-        final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();\r
-        for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {\r
-            final String eventName = metricsPerEventName.getEventName();\r
-            final List<Threshold> thresholds = metricsPerEventName.getThresholds();\r
-            for (Threshold threshold : thresholds) {\r
-                final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());\r
-                if (existingThresholds == null) {\r
-                    final LinkedList<Threshold> newThresholdList = new LinkedList<>();\r
-                    newThresholdList.add(threshold);\r
-                    domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);\r
-                } else {\r
-                    domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);\r
-                }\r
-            }\r
-        }\r
-        return domainFRTable;\r
-    }\r
-\r
-\r
-    /**\r
-     * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table\r
-     *\r
-     * @param tcaPolicy TCA Policy\r
-     * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values\r
-     */\r
-    public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier\r
-    (final TCAPolicy tcaPolicy) {\r
-        return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {\r
-            @Override\r
-            public Table<String, String, List<Threshold>> get() {\r
-                return getPolicyEventNameThresholdsTable(tcaPolicy);\r
-            }\r
-        });\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},\r
-     * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to\r
-     * filter out messages which does not match policy domain or event Name\r
-     *\r
-     * @param cefMessage CEF Message\r
-     * @param tcaPolicy TCA Policy\r
-     * @return Message Process Context after processing filter chain\r
-     */\r
-    public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,\r
-                                                          @Nonnull final TCAPolicy tcaPolicy) {\r
-\r
-        final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();\r
-        final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();\r
-        final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();\r
-        // Create a list of message processors\r
-        final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =\r
-                ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);\r
-        final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);\r
-        // Create a message processors chain\r
-        final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =\r
-                new GenericMessageChainProcessor<>(messageProcessors, processorContext);\r
-        // process chain\r
-        return tcaProcessingChain.processChain();\r
-    }\r
-\r
-\r
-    /**\r
-     * Extracts json path values for given json Field Paths from using Json path notation. Assumes\r
-     * that values extracted are always long\r
-     *\r
-     * @param message CEF Message\r
-     * @param jsonFieldPaths Json Field Paths\r
-     * @return Map containing key as json path and values as values associated with that json path\r
-     */\r
-    public static Map<String, List<BigDecimal>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>\r
-            jsonFieldPaths) {\r
-\r
-        final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();\r
-        final DocumentContext documentContext = JsonPath.parse(message);\r
-\r
-        for (String jsonFieldPath : jsonFieldPaths) {\r
-            List<BigDecimal> jsonFieldValues = null;\r
-\r
-            try {\r
-                jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {\r
-                });\r
-            } catch (Exception e) {\r
-                final String errorMessage = String.format(\r
-                        "Unable to convert jsonFieldPath: %s value to valid number. " +\r
-                                "Json Path value is not in a valid number format. Incoming message: %s",\r
-                        jsonFieldPath, message);\r
-                throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
-            }\r
-            // If Json Field Values are not or empty\r
-            if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {\r
-                // Filter out all null values in the filed values list\r
-                final List<BigDecimal> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,\r
-                        Predicates.<BigDecimal>notNull()));\r
-                // If there are non null values put them in the map\r
-                if (!nonNullValues.isEmpty()) {\r
-                    jsonFieldPathMap.put(jsonFieldPath, nonNullValues);\r
-                }\r
-            }\r
-        }\r
-\r
-        return jsonFieldPathMap;\r
-    }\r
-\r
-    /**\r
-     * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path\r
-     * it applies threshold in order of their severity and record the first threshold per message field path\r
-     *\r
-     * @param messageFieldValues Field Path Values extracted from CEF Message\r
-     * @param fieldThresholds Policy Thresholds for Field Path\r
-     * @return Optional of violated threshold for a field path\r
-     */\r
-    public static Optional<Threshold> thresholdCalculator(final List<BigDecimal> messageFieldValues, final\r
-    List<Threshold>\r
-            fieldThresholds) {\r
-        // order thresholds by severity\r
-        Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);\r
-        // Now apply each threshold to field values\r
-        for (Threshold fieldThreshold : fieldThresholds) {\r
-            for (BigDecimal messageFieldValue : messageFieldValues) {\r
-                final Boolean isThresholdViolated =\r
-                        fieldThreshold.getDirection().operate(messageFieldValue, new BigDecimal(fieldThreshold\r
-                                .getThresholdValue()));\r
-                if (isThresholdViolated) {\r
-                    final Threshold violatedThreshold = Threshold.copy(fieldThreshold);\r
-                    violatedThreshold.setActualFieldValue(messageFieldValue);\r
-                    return Optional.of(violatedThreshold);\r
-                }\r
-            }\r
-        }\r
-        return Optional.absent();\r
-    }\r
-\r
-    /**\r
-     * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.\r
-     * Grabs first highest priority violated threshold\r
-     *\r
-     * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds\r
-     * @return First Highest priority violated threshold\r
-     */\r
-    public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {\r
-\r
-        final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());\r
-\r
-        if (violatedThresholds.size() == 1) {\r
-            return violatedThresholds.get(0);\r
-        }\r
-        Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);\r
-        // Just grab the first violated threshold with highest priority\r
-        return violatedThresholds.get(0);\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates {@link MetricsPerEventName} object which contains violated thresholds\r
-     *\r
-     * @param tcaPolicy TCA Policy\r
-     * @param violatedThreshold Violated thresholds\r
-     * @param eventName Event Name\r
-     *\r
-     * @return MetricsPerEventName object containing one highest severity violated threshold\r
-     */\r
-    public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,\r
-                                                            @Nonnull final Threshold violatedThreshold,\r
-                                                            @Nonnull final String eventName) {\r
-\r
-        final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(\r
-                Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {\r
-                    @Override\r
-                    public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {\r
-                        return metricsPerEventName.getEventName().equals(eventName);\r
-                    }\r
-                }));\r
-        // TCA policy must have only one metrics per event Name\r
-        if (metricsPerEventNames.size() == 1) {\r
-            final MetricsPerEventName violatedMetrics =\r
-                    MetricsPerEventName.copy(metricsPerEventNames.get(0));\r
-            violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));\r
-            return violatedMetrics;\r
-        } else {\r
-            final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);\r
-            throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Computes threshold violations\r
-     *\r
-     * @param processorContext Filtered processor Context\r
-     * @return processor context with any threshold violations\r
-     */\r
-    public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {\r
-        final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();\r
-        return policyThresholdsProcessor.apply(processorContext);\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}\r
-     * formats\r
-     *\r
-     * @param processorContextWithViolations processor context which has TCA violations\r
-     * @param tcaAppName tca app name\r
-     * @param isAlertInCEFFormat determines if output alert is in CEF format\r
-     *\r
-     * @return TCA Alert String\r
-     *\r
-     * @throws JsonProcessingException If alert cannot be parsed into JSON String\r
-     */\r
-    public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,\r
-                                              final String tcaAppName,\r
-                                              final Boolean isAlertInCEFFormat) throws JsonProcessingException {\r
-        if (isAlertInCEFFormat != null && isAlertInCEFFormat) {\r
-            final EventListener eventListenerWithViolations =\r
-                    addThresholdViolationFields(processorContextWithViolations);\r
-            final String alertString = writeValueAsString(eventListenerWithViolations);\r
-            LOG.debug("Created alert in CEF Format: {}", alertString);\r
-            return alertString;\r
-        } else {\r
-            final TCAVESResponse newTCAVESResponse =\r
-                    createNewTCAVESResponse(processorContextWithViolations, tcaAppName);\r
-            final String alertString = writeValueAsString(newTCAVESResponse);\r
-            LOG.debug("Created alert in Non CEF Format: {}", alertString);\r
-            return alertString;\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Adds threshold violation fields to {@link EventListener}\r
-     *\r
-     * @param processorContextWithViolations processor context that contains violations\r
-     * @return event listener with threshold crossing alert fields populated\r
-     */\r
-    public static EventListener addThresholdViolationFields(\r
-            final TCACEFProcessorContext processorContextWithViolations) {\r
-\r
-        final MetricsPerEventName metricsPerEventName =\r
-                processorContextWithViolations.getMetricsPerEventName();\r
-        // confirm violations are indeed present\r
-        if (metricsPerEventName == null) {\r
-            final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";\r
-            throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
-        }\r
-\r
-        // get violated threshold\r
-        final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);\r
-        final EventListener eventListener = processorContextWithViolations.getCEFEventListener();\r
-        final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();\r
-\r
-        // create new threshold crossing alert fields\r
-        final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();\r
-        thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());\r
-        thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());\r
-        thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));\r
-        thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);\r
-        thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);\r
-        thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());\r
-        thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());\r
-        thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());\r
-\r
-        // create new performance count\r
-        final PerformanceCounter performanceCounter = new PerformanceCounter();\r
-        performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));\r
-        performanceCounter.setName(violatedThreshold.getFieldPath());\r
-        performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());\r
-        performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());\r
-\r
-        // set additional parameters for threshold crossing alert fields\r
-        thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));\r
-\r
-        // add threshold crossing fields to existing event listener\r
-        eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);\r
-\r
-        return eventListener;\r
-    }\r
-\r
-    /**\r
-     * Converts {@link EventSeverity} to {@link Criticality}\r
-     *\r
-     * @param eventSeverity event severity\r
-     *\r
-     * @return performance counter criticality\r
-     */\r
-    private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {\r
-        switch (eventSeverity) {\r
-            case CRITICAL:\r
-                return Criticality.CRIT;\r
-            case MAJOR:\r
-                return Criticality.MAJ;\r
-            default:\r
-                return Criticality.UNKNOWN;\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Creates {@link TCAVESResponse} object\r
-     *\r
-     * @param processorContext processor Context with violations\r
-     * @param tcaAppName TCA App Name\r
-     *\r
-     * @return TCA VES Response Message\r
-     */\r
-    public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,\r
-                                                         final String tcaAppName) {\r
-\r
-        final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();\r
-        // confirm violations are indeed present\r
-        if (metricsPerEventName == null) {\r
-            final String errorMessage = "No violations metrics. Unable to create VES Response";\r
-            throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
-        }\r
-\r
-        final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);\r
-        final EventListener eventListener = processorContext.getCEFEventListener();\r
-        final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();\r
-\r
-        final TCAVESResponse tcavesResponse = new TCAVESResponse();\r
-        // ClosedLoopControlName included in the DCAE configuration Policy\r
-        tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());\r
-        // version included in the DCAE configuration Policy\r
-        tcavesResponse.setVersion(violatedThreshold.getVersion());\r
-        // Generate a UUID for this output message\r
-        tcavesResponse.setRequestID(UUID.randomUUID().toString());\r
-        // commonEventHeader.startEpochMicrosec from the received VES message\r
-        tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());\r
-        // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts\r
-        if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) {\r
-            tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec());\r
-        }\r
-        // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot\r
-        tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);\r
-\r
-        final AAI aai = new AAI();\r
-        tcavesResponse.setAai(aai);\r
-\r
-        // VM specific settings\r
-        if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {\r
-            // Hard Coded - "VM"\r
-            tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);\r
-            // Hard Coded - "vserver.vserver-name"\r
-            tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);\r
-            // commonEventHeader.sourceName from the received VES message\r
-            aai.setGenericServerName(commonEventHeader.getSourceName());\r
-        } else {\r
-            // VNF specific settings\r
-            // Hard Coded - "VNF"\r
-            tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);\r
-            // Hard Coded - "generic-vnf.vnf-name"\r
-            tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);\r
-            // commonEventHeader.sourceName from the received VES message\r
-            aai.setGenericVNFName(commonEventHeader.getSourceName());\r
-        }\r
-\r
-        // Hard Coded - "DCAE"\r
-        tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);\r
-        // policyScope included in the DCAE configuration Policy\r
-        tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());\r
-        // policyName included in the DCAE configuration Policy\r
-        tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());\r
-        // policyVersion included in the DCAE configuration Policy\r
-        tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());\r
-        // Extracted from violated threshold\r
-        tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());\r
-\r
-        return tcavesResponse;\r
-    }\r
-\r
-\r
-    /**\r
-     * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert\r
-     *\r
-     * @param tcavesResponse alert\r
-     *\r
-     * @return control Loop Schema Type\r
-     */\r
-    public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) {\r
-        final AAI aai = tcavesResponse.getAai();\r
-        if (aai.getGenericServerName() != null) {\r
-            return ControlLoopSchemaType.VM;\r
-        } else {\r
-            return ControlLoopSchemaType.VNF;\r
-        }\r
-    }\r
-\r
-    /**\r
-     * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert\r
-     *\r
-     * @param tcavesResponse {@link TCAVESResponse} TCA alert\r
-     *\r
-     * @return Source name\r
-     */\r
-    public static String determineSourceName(final TCAVESResponse tcavesResponse) {\r
-        final AAI aai = tcavesResponse.getAai();\r
-        if (aai.getGenericServerName() != null) {\r
-            return aai.getGenericServerName();\r
-        } else {\r
-            return aai.getGenericVNFName();\r
-        }\r
-    }\r
-\r
-\r
-    /**\r
-     * Extract Domain and Event Name from processor context if present\r
-     *\r
-     * @param processorContext processor context\r
-     * @return Tuple of domain and event Name\r
-     */\r
-    public static Pair<String, String> getDomainAndEventName(\r
-            @Nullable final TCACEFProcessorContext processorContext) {\r
-\r
-        String domain = null;\r
-        String eventName = null;\r
-\r
-        if (processorContext != null &&\r
-                processorContext.getCEFEventListener() != null &&\r
-                processorContext.getCEFEventListener().getEvent() != null &&\r
-                processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {\r
-            final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()\r
-                    .getCommonEventHeader();\r
-\r
-            if (commonEventHeader.getDomain() != null) {\r
-                domain = commonEventHeader.getDomain().name();\r
-            }\r
-\r
-            if (commonEventHeader.getEventName() != null) {\r
-                eventName = commonEventHeader.getEventName();\r
-            }\r
-\r
-        }\r
-\r
-        return new ImmutablePair<>(domain, eventName);\r
-\r
-    }\r
-\r
-    /**\r
-     * Creates {@link TCAPolicy} Metrics per Event Name list\r
-     *\r
-     * @param eventNamesMap Map containing event Name as key and corresponding values\r
-     *\r
-     * @return List of {@link MetricsPerEventName}\r
-     */\r
-    public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(\r
-            final Map<String, Map<String, String>> eventNamesMap) {\r
-\r
-        // create a new metrics per event Name list\r
-        final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();\r
-\r
-        for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {\r
-\r
-            // create new metrics per event Name instance\r
-            final MetricsPerEventName newMetricsPerEventName =\r
-                    createNewMetricsPerEventName(eventNamesEntry);\r
-            metricsPerEventNames.add(newMetricsPerEventName);\r
-\r
-            // determine all threshold related values\r
-            final Map<String, String> thresholdsValuesMaps =\r
-                    filterMapByKeyNamePrefix(eventNamesEntry.getValue(),\r
-                            AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);\r
-\r
-            // create a map of all threshold values\r
-            final Map<String, Map<String, String>> thresholdsMap =\r
-                    extractSubTree(thresholdsValuesMaps, 1, 2,\r
-                            AnalyticsConstants.TCA_POLICY_DELIMITER);\r
-\r
-            // add thresholds to nmetrics per event Names threshold list\r
-            for (Map<String, String> thresholdMap : thresholdsMap.values()) {\r
-                newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));\r
-            }\r
-\r
-        }\r
-\r
-        return metricsPerEventNames;\r
-    }\r
-\r
-    /**\r
-     * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap\r
-     *\r
-     * @param thresholdMap threshold map with threshold values\r
-     *\r
-     * @return new instance of TCA Policy Threshold\r
-     */\r
-    public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {\r
-        final Threshold threshold = new Threshold();\r
-        threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));\r
-        threshold.setVersion(thresholdMap.get("policy.version"));\r
-        threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));\r
-        threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));\r
-        threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));\r
-        threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));\r
-        threshold.setClosedLoopEventStatus(\r
-                ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));\r
-        return threshold;\r
-    }\r
-\r
-    /**\r
-     * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope\r
-     * extracted from given eventNamesEntry\r
-     *\r
-     * @param eventNamesEntry Event Names Entry\r
-     *\r
-     * @return new instance of MetricsPerEventName\r
-     */\r
-    public static MetricsPerEventName createNewMetricsPerEventName(\r
-            final Map.Entry<String, Map<String, String>> eventNamesEntry) {\r
-        // determine event Name\r
-        final String eventName = eventNamesEntry.getKey();\r
-        // determine event Name thresholds\r
-        final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();\r
-        final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();\r
-        final List<Threshold> thresholds = new LinkedList<>();\r
-        metricsPerEventName.setThresholds(thresholds);\r
-        metricsPerEventName.setEventName(eventName);\r
-        // bind policyName, policyVersion, policyScope and closedLoopControlName\r
-        metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));\r
-        metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));\r
-        metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));\r
-        metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(\r
-                metricsPerEventNameThresholdsMap.get("controlLoopSchemaType")));\r
-        return metricsPerEventName;\r
-    }\r
-\r
-    /**\r
-     * Converts a flattened key/value map which has keys delimited by a given delimiter.\r
-     * The start Index and end index extract the sub-key value and returns a new map containing\r
-     * sub-keys and values.\r
-     *\r
-     * @param actualMap actual Map\r
-     * @param startIndex start index\r
-     * @param endIndex end index\r
-     * @param delimiter delimiter\r
-     *\r
-     * @return Map with new sub tree map\r
-     */\r
-    public static Map<String, Map<String, String>> extractSubTree(\r
-            final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {\r
-\r
-        final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();\r
-\r
-        // iterate over actual map entries\r
-        for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {\r
-            final String actualMapKey = actualMapEntry.getKey();\r
-            final String actualMapValue = actualMapEntry.getValue();\r
-\r
-            // determine delimiter start and end index\r
-            final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);\r
-            final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);\r
-            final int keyLength = actualMapKey.length();\r
-\r
-            // extract sub-tree map\r
-            if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {\r
-                final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);\r
-                final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);\r
-                final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);\r
-                if (existingThresholdMap == null) {\r
-                    Map<String, String> newThresholdMap = new LinkedHashMap<>();\r
-                    newThresholdMap.put(subMapKey, actualMapValue);\r
-                    subTreeMap.put(thresholdKey, newThresholdMap);\r
-                } else {\r
-                    existingThresholdMap.put(subMapKey, actualMapValue);\r
-                }\r
-\r
-            }\r
-        }\r
-\r
-        return subTreeMap;\r
-\r
-    }\r
-\r
-\r
-    /**\r
-     * Provides a view of underlying map that filters out entries with keys starting with give prefix\r
-     *\r
-     * @param actualMap Target map that needs to be filtered\r
-     * @param keyNamePrefix key prefix\r
-     *\r
-     * @return a view of actual map which only show entries which have give prefix\r
-     */\r
-    public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,\r
-                                                               final String keyNamePrefix) {\r
-        return Maps.filterKeys(actualMap,\r
-                new Predicate<String>() {\r
-                    @Override\r
-                    public boolean apply(@Nullable String key) {\r
-                        return key != null && key.startsWith(keyNamePrefix);\r
-                    }\r
-                });\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates Quartz Scheduler\r
-     *\r
-     * @param pollingIntervalMS polling interval\r
-     * @param stdSchedulerFactory Quartz standard schedule factory instance\r
-     * @param quartzPublisherPropertiesFileName quartz properties file name\r
-     * @param jobDataMap job Data map\r
-     * @param quartzJobClass Quartz Job Class\r
-     * @param quartzJobName Quartz Job Name\r
-     * @param quartzTriggerName Quartz Trigger name\r
-     *\r
-     * @param <T> An implementation of Quartz {@link Job} interface\r
-     * @return Configured Quartz Scheduler\r
-     *\r
-     * @throws SchedulerException exception if unable to create to Quartz Scheduler\r
-     */\r
-    public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,\r
-            final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,\r
-            final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,\r
-            final String quartzTriggerName) throws SchedulerException {\r
-\r
-        // Initialize a new Quartz Standard scheduler\r
-        LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",\r
-                quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);\r
-        final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(\r
-                quartzPublisherPropertiesFileName, new Properties());\r
-        stdSchedulerFactory.initialize(quartzProperties);\r
-        final Scheduler scheduler = stdSchedulerFactory.getScheduler();\r
-\r
-        // Create a new job detail\r
-        final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,\r
-                AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();\r
-\r
-        // Create a new scheduling builder\r
-        final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()\r
-                .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule\r
-                .repeatForever(); // repeats while worker is running\r
-\r
-        // Create a trigger for the TCA Publisher Job\r
-        final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()\r
-                .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)\r
-                .startNow() // job starts right away\r
-                .withSchedule(simpleScheduleBuilder).build();\r
-\r
-        scheduler.scheduleJob(jobDetail, simpleTrigger);\r
-        LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());\r
-        return scheduler;\r
-    }\r
-\r
-\r
-    /**\r
-     * Does A&AI Enrichment for VM\r
-     *\r
-     * @param tcavesResponse Outgoing alert object\r
-     * @param aaiEnrichmentClient A&AI Enrichment client\r
-     * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path\r
-     * @param alertString alert String\r
-     * @param vmSourceName vm source name\r
-     */\r
-    public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse,\r
-                                         final AAIEnrichmentClient aaiEnrichmentClient,\r
-                                         final String aaiVMEnrichmentAPIPath,\r
-                                         final String alertString,\r
-                                         final String vmSourceName) {\r
-\r
-        final String filterString = "vserver-name:EQUALS:" + vmSourceName;\r
-        final ImmutableMap<String, String> queryParams = ImmutableMap.of(\r
-                "search-node-type", "vserver", "filter", filterString);\r
-\r
-        // fetch vm object resource Link from A&AI\r
-        final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
-                aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());\r
-        final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails);\r
-\r
-        if (vmObjectResourceLink == null) {\r
-            LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " +\r
-                    "determined for vmSourceName: {}.", alertString, vmSourceName);\r
-        } else {\r
-\r
-            LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}",\r
-                    vmSourceName, vmObjectResourceLink);\r
-\r
-            // fetch vm A&AI Enrichment\r
-            final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
-                    vmObjectResourceLink, Collections.<String, String>emptyMap(), createAAIEnrichmentHeaders());\r
-\r
-            // enrich AAI\r
-            enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString,\r
-                    AnalyticsConstants.AAI_VSERVER_KEY_PREFIX);\r
-        }\r
-\r
-\r
-    }\r
-\r
-\r
-    /**\r
-     * Does A&AI Enrichment for VNF\r
-     *\r
-     * @param tcavesResponse Outgoing alert object\r
-     * @param aaiEnrichmentClient A&AI Enrichment client\r
-     * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path\r
-     * @param alertString alert String\r
-     * @param vnfSourceName vnf source name\r
-     */\r
-    public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse,\r
-                                          final AAIEnrichmentClient aaiEnrichmentClient,\r
-                                          final String aaiVNFEnrichmentAPIPath,\r
-                                          final String alertString,\r
-                                          final String vnfSourceName) {\r
-        final ImmutableMap<String, String> queryParams = ImmutableMap.of("vnf-name", vnfSourceName);\r
-\r
-        // fetch vnf A&AI Enrichment\r
-        final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
-                aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());\r
-\r
-        // enrich alert AAI\r
-        enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX);\r
-    }\r
-\r
-    /**\r
-     * Fetches VM Object Resource Link from A&AI Resource Link Json\r
-     *\r
-     * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json\r
-     *\r
-     * @return object resource link String\r
-     */\r
-    private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) {\r
-        if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) {\r
-            try {\r
-                final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails);\r
-                final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link");\r
-                if (!resourceLinkJsonNode.isMissingNode()) {\r
-                    return resourceLinkJsonNode.asText();\r
-                }\r
-            } catch (IOException e) {\r
-                LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}",\r
-                        vmAAIResourceLinkDetails, e);\r
-            }\r
-        }\r
-        return null;\r
-    }\r
-\r
-    /**\r
-     * Creates Http Headers for A&AI Enrichment client\r
-     *\r
-     * @return Http Headers Map for A&AI Enrichment client\r
-     */\r
-    private static Map<String, String> createAAIEnrichmentHeaders() {\r
-        final Map<String, String> aaiEnrichmentHeaders = new LinkedHashMap<>();\r
-        final String transactionId = Long.toString(new Date().getTime());\r
-        aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca");\r
-        aaiEnrichmentHeaders.put("X-TransactionId", transactionId);\r
-        aaiEnrichmentHeaders.put("Accept", "application/json");\r
-        aaiEnrichmentHeaders.put("Real-Time", "true");\r
-        aaiEnrichmentHeaders.put("Content-Type", "application/json");\r
-        return aaiEnrichmentHeaders;\r
-    }\r
-\r
-\r
-    /**\r
-     * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object\r
-     *\r
-     * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details\r
-     * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String\r
-     * @param alertString Alert String\r
-     * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record\r
-     */\r
-    private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails,\r
-                                  final String alertString, final String keyPrefix) {\r
-\r
-        if (aaiEnrichmentDetails == null) {\r
-            LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." +\r
-                    "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString);\r
-\r
-        } else {\r
-\r
-            final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails);\r
-\r
-            if (enrichmentDetailsAAI != null) {\r
-                final Set<Map.Entry<String, Object>> enrichedAAIEntrySet =\r
-                        enrichmentDetailsAAI.getDynamicProperties().entrySet();\r
-                final Map<String, Object> preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties();\r
-\r
-                // populate A&AI Enrichment details and add prefix to key\r
-                for (Map.Entry<String, Object> enrichedAAIEntry : enrichedAAIEntrySet) {\r
-                    preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(),\r
-                            enrichedAAIEntry.getValue());\r
-                }\r
-\r
-                LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}",\r
-                        alertString, preEnrichmentAAI);\r
-            } else {\r
-                LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." +\r
-                                "Skipping Enrichment for alert message: {}",\r
-                        preEnrichmentAAI, aaiEnrichmentDetails, alertString);\r
-            }\r
-        }\r
-\r
-    }\r
-\r
-    /**\r
-     * Creates a new A&AI object with only top level A&AI Enrichment details\r
-     *\r
-     * @param aaiEnrichmentDetails A&AI Enrichment details\r
-     *\r
-     * @return new A&AI with only top level A&AI Enrichment details\r
-     */\r
-    private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) {\r
-        try {\r
-            final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails);\r
-            final Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();\r
-            while (fieldsIterator.hasNext()) {\r
-                final Map.Entry<String, JsonNode> fieldEntry = fieldsIterator.next();\r
-                final JsonNode jsonNode = fieldEntry.getValue();\r
-                // remove all arrays, objects from A&AI Enrichment Json\r
-                if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) {\r
-                    fieldsIterator.remove();\r
-                }\r
-            }\r
-            return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class);\r
-        } catch (IOException e) {\r
-            LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e);\r
-        }\r
-        return null;\r
-    }\r
-\r
-}\r
+/*
+ * ===============================LICENSE_START======================================
+ *  dcae-analytics
+ * ================================================================================
+ *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.tca.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.TypeRef;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.dcae.apod.analytics.aai.service.AAIEnrichmentClient;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException;
+import org.onap.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
+import org.onap.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
+import org.onap.dcae.apod.analytics.model.domain.cef.AlertAction;
+import org.onap.dcae.apod.analytics.model.domain.cef.AlertType;
+import org.onap.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
+import org.onap.dcae.apod.analytics.model.domain.cef.Criticality;
+import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.onap.dcae.apod.analytics.model.domain.cef.EventSeverity;
+import org.onap.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
+import org.onap.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.Direction;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
+import org.onap.dcae.apod.analytics.model.facade.tca.AAI;
+import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
+import org.onap.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
+import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
+import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
+import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
+import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
+import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
+import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.SimpleTrigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
+
+/**
+ * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
+ * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
+ * and TCA Policy
+ *
+ * @author Rajiv Singla . Creation Date: 10/24/2016.
+ */
+public abstract class TCAUtils extends AnalyticsModelJsonUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
+
+    /**
+     * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
+     * WARNING )
+     */
+    private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
+        @Override
+        public int compare(Threshold threshold1, Threshold threshold2) {
+            return threshold1.getSeverity().compareTo(threshold2.getSeverity());
+        }
+    };
+
+    /**
+     * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}
+     *
+     * @return TCA Policy Metrics Per Event Name list
+     */
+    public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {
+        return new Function<TCAPolicy, List<MetricsPerEventName>>() {
+            @Nullable
+            @Override
+            public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {
+                return tcaPolicy.getMetricsPerEventName();
+            }
+        };
+    }
+
+    /**
+     * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from
+     * {@link MetricsPerEventName}
+     *
+     * @return Event Names or a Metrics Per Event Name object
+     */
+    public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {
+        return new Function<MetricsPerEventName, String>() {
+            @Override
+            public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {
+                return metricsPerEventName.getEventName();
+            }
+        };
+    }
+
+
+    /**
+     * Extracts {@link TCAPolicy} Event Names
+     *
+     * @param tcaPolicy TCA Policy
+     * @return List of event names in the TCA Policy
+     */
+    public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {
+        final List<MetricsPerEventName> metricsPerEventNames =
+                tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
+
+        return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());
+    }
+
+    /**
+     * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to
+     * change during runtime
+     *
+     * @param tcaPolicy TCA Policy
+     * @return a Supplier that memoize the TCA Policy event names
+     */
+    public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {
+        return Suppliers.memoize(new Supplier<List<String>>() {
+            @Override
+            public List<String> get() {
+                return getPolicyEventNames(tcaPolicy);
+            }
+        });
+    }
+
+
+    /**
+     * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path
+     *
+     * @param tcaPolicy TCA Policy
+     * @return A table with Keys of event name and field path containing List of threshold as values
+     */
+    public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {
+        final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
+        for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {
+            final String eventName = metricsPerEventName.getEventName();
+            final List<Threshold> thresholds = metricsPerEventName.getThresholds();
+            for (Threshold threshold : thresholds) {
+                final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());
+                if (existingThresholds == null) {
+                    final LinkedList<Threshold> newThresholdList = new LinkedList<>();
+                    newThresholdList.add(threshold);
+                    domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);
+                } else {
+                    domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);
+                }
+            }
+        }
+        return domainFRTable;
+    }
+
+
+    /**
+     * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table
+     *
+     * @param tcaPolicy TCA Policy
+     * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values
+     */
+    public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier
+    (final TCAPolicy tcaPolicy) {
+        return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
+            @Override
+            public Table<String, String, List<Threshold>> get() {
+                return getPolicyEventNameThresholdsTable(tcaPolicy);
+            }
+        });
+    }
+
+
+    /**
+     * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
+     * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to
+     * filter out messages which does not match policy domain or event Name
+     *
+     * @param cefMessage CEF Message
+     * @param tcaPolicy TCA Policy
+     * @return Message Process Context after processing filter chain
+     */
+    public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
+                                                          @Nonnull final TCAPolicy tcaPolicy) {
+
+        final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
+        final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
+        final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();
+        // Create a list of message processors
+        final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
+                ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);
+        final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
+        // Create a message processors chain
+        final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
+                new GenericMessageChainProcessor<>(messageProcessors, processorContext);
+        // process chain
+        return tcaProcessingChain.processChain();
+    }
+
+
+    /**
+     * Extracts json path values for given json Field Paths from using Json path notation. Assumes
+     * that values extracted are always long
+     *
+     * @param message CEF Message
+     * @param jsonFieldPaths Json Field Paths
+     * @return Map containing key as json path and values as values associated with that json path
+     */
+    public static Map<String, List<BigDecimal>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
+            jsonFieldPaths) {
+
+        final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
+        final DocumentContext documentContext = JsonPath.parse(message);
+
+        for (String jsonFieldPath : jsonFieldPaths) {
+            List<BigDecimal> jsonFieldValues = null;
+
+            try {
+                jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
+                });
+            } catch (Exception e) {
+                final String errorMessage = String.format(
+                        "Unable to convert jsonFieldPath: %s value to valid number. " +
+                                "Json Path value is not in a valid number format. Incoming message: %s",
+                        jsonFieldPath, message);
+                throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+            }
+            // If Json Field Values are not or empty
+            if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
+                // Filter out all null values in the filed values list
+                final List<BigDecimal> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
+                        Predicates.<BigDecimal>notNull()));
+                // If there are non null values put them in the map
+                if (!nonNullValues.isEmpty()) {
+                    jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
+                }
+            }
+        }
+
+        return jsonFieldPathMap;
+    }
+
+    /**
+     * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
+     * it applies threshold in order of their severity and record the first threshold per message field path
+     *
+     * @param messageFieldValues Field Path Values extracted from CEF Message
+     * @param fieldThresholds Policy Thresholds for Field Path
+     * @return Optional of violated threshold for a field path
+     */
+    public static Optional<Threshold> thresholdCalculator(final List<BigDecimal> messageFieldValues, final
+    List<Threshold>
+            fieldThresholds) {
+        // order thresholds by severity
+        Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
+        // Now apply each threshold to field values
+        for (Threshold fieldThreshold : fieldThresholds) {
+            for (BigDecimal messageFieldValue : messageFieldValues) {
+                final Boolean isThresholdViolated =
+                        fieldThreshold.getDirection().operate(messageFieldValue, new BigDecimal(fieldThreshold
+                                .getThresholdValue()));
+                if (isThresholdViolated) {
+                    final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
+                    violatedThreshold.setActualFieldValue(messageFieldValue);
+                    return Optional.of(violatedThreshold);
+                }
+            }
+        }
+        return Optional.absent();
+    }
+
+    /**
+     * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
+     * Grabs first highest priority violated threshold
+     *
+     * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
+     * @return First Highest priority violated threshold
+     */
+    public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
+
+        final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
+
+        if (violatedThresholds.size() == 1) {
+            return violatedThresholds.get(0);
+        }
+        Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
+        // Just grab the first violated threshold with highest priority
+        return violatedThresholds.get(0);
+    }
+
+
+    /**
+     * Creates {@link MetricsPerEventName} object which contains violated thresholds
+     *
+     * @param tcaPolicy TCA Policy
+     * @param violatedThreshold Violated thresholds
+     * @param eventName Event Name
+     *
+     * @return MetricsPerEventName object containing one highest severity violated threshold
+     */
+    public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
+                                                            @Nonnull final Threshold violatedThreshold,
+                                                            @Nonnull final String eventName) {
+
+        final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(
+                Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {
+                    @Override
+                    public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {
+                        return metricsPerEventName.getEventName().equals(eventName);
+                    }
+                }));
+        // TCA policy must have only one metrics per event Name
+        if (metricsPerEventNames.size() == 1) {
+            final MetricsPerEventName violatedMetrics =
+                    MetricsPerEventName.copy(metricsPerEventNames.get(0));
+            violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
+            return violatedMetrics;
+        } else {
+            final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);
+            throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
+        }
+    }
+
+    /**
+     * Computes threshold violations
+     *
+     * @param processorContext Filtered processor Context
+     * @return processor context with any threshold violations
+     */
+    public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
+        final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
+        return policyThresholdsProcessor.apply(processorContext);
+    }
+
+
+    /**
+     * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
+     * formats
+     *
+     * @param processorContextWithViolations processor context which has TCA violations
+     * @param tcaAppName tca app name
+     * @param isAlertInCEFFormat determines if output alert is in CEF format
+     *
+     * @return TCA Alert String
+     *
+     * @throws JsonProcessingException If alert cannot be parsed into JSON String
+     */
+    public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
+                                              final String tcaAppName,
+                                              final Boolean isAlertInCEFFormat) throws JsonProcessingException {
+        if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
+            final EventListener eventListenerWithViolations =
+                    addThresholdViolationFields(processorContextWithViolations);
+            final String alertString = writeValueAsString(eventListenerWithViolations);
+            LOG.debug("Created alert in CEF Format: {}", alertString);
+            return alertString;
+        } else {
+            final TCAVESResponse newTCAVESResponse =
+                    createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
+            final String alertString = writeValueAsString(newTCAVESResponse);
+            LOG.debug("Created alert in Non CEF Format: {}", alertString);
+            return alertString;
+        }
+    }
+
+    /**
+     * Adds threshold violation fields to {@link EventListener}
+     *
+     * @param processorContextWithViolations processor context that contains violations
+     * @return event listener with threshold crossing alert fields populated
+     */
+    public static EventListener addThresholdViolationFields(
+            final TCACEFProcessorContext processorContextWithViolations) {
+
+        final MetricsPerEventName metricsPerEventName =
+                processorContextWithViolations.getMetricsPerEventName();
+        // confirm violations are indeed present
+        if (metricsPerEventName == null) {
+            final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
+            throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+        }
+
+        // get violated threshold
+        final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
+        final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
+        final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
+
+        // create new threshold crossing alert fields
+        final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
+        thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
+        thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
+        thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
+        thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
+        thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
+        thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
+        thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
+        thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());
+
+        // create new performance count
+        final PerformanceCounter performanceCounter = new PerformanceCounter();
+        performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
+        performanceCounter.setName(violatedThreshold.getFieldPath());
+        performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
+        performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
+
+        // set additional parameters for threshold crossing alert fields
+        thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
+
+        // add threshold crossing fields to existing event listener
+        eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
+
+        return eventListener;
+    }
+
+    /**
+     * Converts {@link EventSeverity} to {@link Criticality}
+     *
+     * @param eventSeverity event severity
+     *
+     * @return performance counter criticality
+     */
+    private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
+        switch (eventSeverity) {
+            case CRITICAL:
+                return Criticality.CRIT;
+            case MAJOR:
+                return Criticality.MAJ;
+            default:
+                return Criticality.UNKNOWN;
+        }
+    }
+
+    /**
+     * Creates {@link TCAVESResponse} object
+     *
+     * @param processorContext processor Context with violations
+     * @param tcaAppName TCA App Name
+     *
+     * @return TCA VES Response Message
+     */
+    public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
+                                                         final String tcaAppName) {
+
+        final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
+        // confirm violations are indeed present
+        if (metricsPerEventName == null) {
+            final String errorMessage = "No violations metrics. Unable to create VES Response";
+            throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+        }
+
+        final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
+        final EventListener eventListener = processorContext.getCEFEventListener();
+        final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
+
+        final TCAVESResponse tcavesResponse = new TCAVESResponse();
+        // ClosedLoopControlName included in the DCAE configuration Policy
+        tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
+        // version included in the DCAE configuration Policy
+        tcavesResponse.setVersion(violatedThreshold.getVersion());
+        // Generate a UUID for this output message
+        tcavesResponse.setRequestID(UUID.randomUUID().toString());
+        // commonEventHeader.startEpochMicrosec from the received VES message
+        tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
+        // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts
+        if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) {
+            tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec());
+        }
+        // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
+        tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
+
+        final AAI aai = new AAI();
+        tcavesResponse.setAai(aai);
+
+        // VM specific settings
+        if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {
+            // Hard Coded - "VM"
+            tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);
+            // Hard Coded - "vserver.vserver-name"
+            tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);
+            // commonEventHeader.sourceName from the received VES message
+            aai.setGenericServerName(commonEventHeader.getSourceName());
+        } else {
+            // VNF specific settings
+            // Hard Coded - "VNF"
+            tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);
+            // Hard Coded - "generic-vnf.vnf-name"
+            tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);
+            // commonEventHeader.sourceName from the received VES message
+            aai.setGenericVNFName(commonEventHeader.getSourceName());
+        }
+
+        // Hard Coded - "DCAE"
+        tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
+        // policyScope included in the DCAE configuration Policy
+        tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());
+        // policyName included in the DCAE configuration Policy
+        tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());
+        // policyVersion included in the DCAE configuration Policy
+        tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());
+        // Extracted from violated threshold
+        tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());
+
+        return tcavesResponse;
+    }
+
+
+    /**
+     * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
+     *
+     * @param tcavesResponse alert
+     *
+     * @return control Loop Schema Type
+     */
+    public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) {
+        final AAI aai = tcavesResponse.getAai();
+        if (aai.getGenericServerName() != null) {
+            return ControlLoopSchemaType.VM;
+        } else {
+            return ControlLoopSchemaType.VNF;
+        }
+    }
+
+    /**
+     * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
+     *
+     * @param tcavesResponse {@link TCAVESResponse} TCA alert
+     *
+     * @return Source name
+     */
+    public static String determineSourceName(final TCAVESResponse tcavesResponse) {
+        final AAI aai = tcavesResponse.getAai();
+        if (aai.getGenericServerName() != null) {
+            return aai.getGenericServerName();
+        } else {
+            return aai.getGenericVNFName();
+        }
+    }
+
+
+    /**
+     * Extract Domain and Event Name from processor context if present
+     *
+     * @param processorContext processor context
+     * @return Tuple of domain and event Name
+     */
+    public static Pair<String, String> getDomainAndEventName(
+            @Nullable final TCACEFProcessorContext processorContext) {
+
+        String domain = null;
+        String eventName = null;
+
+        if (processorContext != null &&
+                processorContext.getCEFEventListener() != null &&
+                processorContext.getCEFEventListener().getEvent() != null &&
+                processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
+            final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
+                    .getCommonEventHeader();
+
+            if (commonEventHeader.getDomain() != null) {
+                domain = commonEventHeader.getDomain().name();
+            }
+
+            if (commonEventHeader.getEventName() != null) {
+                eventName = commonEventHeader.getEventName();
+            }
+
+        }
+
+        return new ImmutablePair<>(domain, eventName);
+
+    }
+
+    /**
+     * Creates {@link TCAPolicy} Metrics per Event Name list
+     *
+     * @param eventNamesMap Map containing event Name as key and corresponding values
+     *
+     * @return List of {@link MetricsPerEventName}
+     */
+    public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(
+            final Map<String, Map<String, String>> eventNamesMap) {
+
+        // create a new metrics per event Name list
+        final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();
+
+        for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {
+
+            // create new metrics per event Name instance
+            final MetricsPerEventName newMetricsPerEventName =
+                    createNewMetricsPerEventName(eventNamesEntry);
+            metricsPerEventNames.add(newMetricsPerEventName);
+
+            // determine all threshold related values
+            final Map<String, String> thresholdsValuesMaps =
+                    filterMapByKeyNamePrefix(eventNamesEntry.getValue(),
+                            AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
+
+            // create a map of all threshold values
+            final Map<String, Map<String, String>> thresholdsMap =
+                    extractSubTree(thresholdsValuesMaps, 1, 2,
+                            AnalyticsConstants.TCA_POLICY_DELIMITER);
+
+            // add thresholds to nmetrics per event Names threshold list
+            for (Map<String, String> thresholdMap : thresholdsMap.values()) {
+                newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));
+            }
+
+        }
+
+        return metricsPerEventNames;
+    }
+
+    /**
+     * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
+     *
+     * @param thresholdMap threshold map with threshold values
+     *
+     * @return new instance of TCA Policy Threshold
+     */
+    public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
+        final Threshold threshold = new Threshold();
+        threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
+        threshold.setVersion(thresholdMap.get("policy.version"));
+        threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
+        threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
+        threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
+        threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
+        threshold.setClosedLoopEventStatus(
+                ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));
+        return threshold;
+    }
+
+    /**
+     * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope
+     * extracted from given eventNamesEntry
+     *
+     * @param eventNamesEntry Event Names Entry
+     *
+     * @return new instance of MetricsPerEventName
+     */
+    public static MetricsPerEventName createNewMetricsPerEventName(
+            final Map.Entry<String, Map<String, String>> eventNamesEntry) {
+        // determine event Name
+        final String eventName = eventNamesEntry.getKey();
+        // determine event Name thresholds
+        final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();
+        final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();
+        final List<Threshold> thresholds = new LinkedList<>();
+        metricsPerEventName.setThresholds(thresholds);
+        metricsPerEventName.setEventName(eventName);
+        // bind policyName, policyVersion, policyScope and closedLoopControlName
+        metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));
+        metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));
+        metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));
+        metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(
+                metricsPerEventNameThresholdsMap.get("controlLoopSchemaType")));
+        return metricsPerEventName;
+    }
+
+    /**
+     * Converts a flattened key/value map which has keys delimited by a given delimiter.
+     * The start Index and end index extract the sub-key value and returns a new map containing
+     * sub-keys and values.
+     *
+     * @param actualMap actual Map
+     * @param startIndex start index
+     * @param endIndex end index
+     * @param delimiter delimiter
+     *
+     * @return Map with new sub tree map
+     */
+    public static Map<String, Map<String, String>> extractSubTree(
+            final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
+
+        final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
+
+        // iterate over actual map entries
+        for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
+            final String actualMapKey = actualMapEntry.getKey();
+            final String actualMapValue = actualMapEntry.getValue();
+
+            // determine delimiter start and end index
+            final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
+            final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
+            final int keyLength = actualMapKey.length();
+
+            // extract sub-tree map
+            if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
+                final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
+                final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
+                final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
+                if (existingThresholdMap == null) {
+                    Map<String, String> newThresholdMap = new LinkedHashMap<>();
+                    newThresholdMap.put(subMapKey, actualMapValue);
+                    subTreeMap.put(thresholdKey, newThresholdMap);
+                } else {
+                    existingThresholdMap.put(subMapKey, actualMapValue);
+                }
+
+            }
+        }
+
+        return subTreeMap;
+
+    }
+
+
+    /**
+     * Provides a view of underlying map that filters out entries with keys starting with give prefix
+     *
+     * @param actualMap Target map that needs to be filtered
+     * @param keyNamePrefix key prefix
+     *
+     * @return a view of actual map which only show entries which have give prefix
+     */
+    public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
+                                                               final String keyNamePrefix) {
+        return Maps.filterKeys(actualMap,
+                new Predicate<String>() {
+                    @Override
+                    public boolean apply(@Nullable String key) {
+                        return key != null && key.startsWith(keyNamePrefix);
+                    }
+                });
+    }
+
+
+    /**
+     * Creates Quartz Scheduler
+     *
+     * @param pollingIntervalMS polling interval
+     * @param stdSchedulerFactory Quartz standard schedule factory instance
+     * @param quartzPublisherPropertiesFileName quartz properties file name
+     * @param jobDataMap job Data map
+     * @param quartzJobClass Quartz Job Class
+     * @param quartzJobName Quartz Job Name
+     * @param quartzTriggerName Quartz Trigger name
+     *
+     * @param <T> An implementation of Quartz {@link Job} interface
+     * @return Configured Quartz Scheduler
+     *
+     * @throws SchedulerException exception if unable to create to Quartz Scheduler
+     */
+    public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
+            final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
+            final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
+            final String quartzTriggerName) throws SchedulerException {
+
+        // Initialize a new Quartz Standard scheduler
+        LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
+                quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
+        final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
+                quartzPublisherPropertiesFileName, new Properties());
+        stdSchedulerFactory.initialize(quartzProperties);
+        final Scheduler scheduler = stdSchedulerFactory.getScheduler();
+
+        // Create a new job detail
+        final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
+                AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
+
+        // Create a new scheduling builder
+        final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
+                .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
+                .repeatForever(); // repeats while worker is running
+
+        // Create a trigger for the TCA Publisher Job
+        final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
+                .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
+                .startNow() // job starts right away
+                .withSchedule(simpleScheduleBuilder).build();
+
+        scheduler.scheduleJob(jobDetail, simpleTrigger);
+        LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());
+        return scheduler;
+    }
+
+
+    /**
+     * Does A&AI Enrichment for VM
+     *
+     * @param tcavesResponse Outgoing alert object
+     * @param aaiEnrichmentClient A&AI Enrichment client
+     * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path
+     * @param alertString alert String
+     * @param vmSourceName vm source name
+     */
+    public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse,
+                                         final AAIEnrichmentClient aaiEnrichmentClient,
+                                         final String aaiVMEnrichmentAPIPath,
+                                         final String alertString,
+                                         final String vmSourceName) {
+
+        final String filterString = "vserver-name:EQUALS:" + vmSourceName;
+        final ImmutableMap<String, String> queryParams = ImmutableMap.of(
+                "search-node-type", "vserver", "filter", filterString);
+
+        // fetch vm object resource Link from A&AI
+        final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails(
+                aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
+        final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails);
+
+        if (vmObjectResourceLink == null) {
+            LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " +
+                    "determined for vmSourceName: {}.", alertString, vmSourceName);
+        } else {
+
+            LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}",
+                    vmSourceName, vmObjectResourceLink);
+
+            // fetch vm A&AI Enrichment
+            final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
+                    vmObjectResourceLink, Collections.<String, String>emptyMap(), createAAIEnrichmentHeaders());
+
+            // enrich AAI
+            enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString,
+                    AnalyticsConstants.AAI_VSERVER_KEY_PREFIX);
+        }
+
+
+    }
+
+
+    /**
+     * Does A&AI Enrichment for VNF
+     *
+     * @param tcavesResponse Outgoing alert object
+     * @param aaiEnrichmentClient A&AI Enrichment client
+     * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path
+     * @param alertString alert String
+     * @param vnfSourceName vnf source name
+     */
+    public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse,
+                                          final AAIEnrichmentClient aaiEnrichmentClient,
+                                          final String aaiVNFEnrichmentAPIPath,
+                                          final String alertString,
+                                          final String vnfSourceName) {
+        final ImmutableMap<String, String> queryParams = ImmutableMap.of("vnf-name", vnfSourceName);
+
+        // fetch vnf A&AI Enrichment
+        final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
+                aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
+
+        // enrich alert AAI
+        enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX);
+    }
+
+    /**
+     * Fetches VM Object Resource Link from A&AI Resource Link Json
+     *
+     * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json
+     *
+     * @return object resource link String
+     */
+    private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) {
+        if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) {
+            try {
+                final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails);
+                final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link");
+                if (!resourceLinkJsonNode.isMissingNode()) {
+                    return resourceLinkJsonNode.asText();
+                }
+            } catch (IOException e) {
+                LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}",
+                        vmAAIResourceLinkDetails, e);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Creates Http Headers for A&AI Enrichment client
+     *
+     * @return Http Headers Map for A&AI Enrichment client
+     */
+    private static Map<String, String> createAAIEnrichmentHeaders() {
+        final Map<String, String> aaiEnrichmentHeaders = new LinkedHashMap<>();
+        final String transactionId = Long.toString(new Date().getTime());
+        aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca");
+        aaiEnrichmentHeaders.put("X-TransactionId", transactionId);
+        aaiEnrichmentHeaders.put("Accept", "application/json");
+        aaiEnrichmentHeaders.put("Real-Time", "true");
+        aaiEnrichmentHeaders.put("Content-Type", "application/json");
+        return aaiEnrichmentHeaders;
+    }
+
+
+    /**
+     * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object
+     *
+     * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details
+     * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String
+     * @param alertString Alert String
+     * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record
+     */
+    private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails,
+                                  final String alertString, final String keyPrefix) {
+
+        if (aaiEnrichmentDetails == null) {
+            LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." +
+                    "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString);
+
+        } else {
+
+            final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails);
+
+            if (enrichmentDetailsAAI != null) {
+                final Set<Map.Entry<String, Object>> enrichedAAIEntrySet =
+                        enrichmentDetailsAAI.getDynamicProperties().entrySet();
+                final Map<String, Object> preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties();
+
+                // populate A&AI Enrichment details and add prefix to key
+                for (Map.Entry<String, Object> enrichedAAIEntry : enrichedAAIEntrySet) {
+                    preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(),
+                            enrichedAAIEntry.getValue());
+                }
+
+                LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}",
+                        alertString, preEnrichmentAAI);
+            } else {
+                LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." +
+                                "Skipping Enrichment for alert message: {}",
+                        preEnrichmentAAI, aaiEnrichmentDetails, alertString);
+            }
+        }
+
+    }
+
+    /**
+     * Creates a new A&AI object with only top level A&AI Enrichment details
+     *
+     * @param aaiEnrichmentDetails A&AI Enrichment details
+     *
+     * @return new A&AI with only top level A&AI Enrichment details
+     */
+    private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) {
+        try {
+            final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails);
+            final Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
+            while (fieldsIterator.hasNext()) {
+                final Map.Entry<String, JsonNode> fieldEntry = fieldsIterator.next();
+                final JsonNode jsonNode = fieldEntry.getValue();
+                // remove all arrays, objects from A&AI Enrichment Json
+                if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) {
+                    fieldsIterator.remove();
+                }
+            }
+            return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class);
+        } catch (IOException e) {
+            LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e);
+        }
+        return null;
+    }
+
+}